Completed
Push — master ( 558fdd...a80a7b )
by Olivier
02:58
created

opcua.MessageChunk.__str__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
dl 0
loc 3
ccs 1
cts 2
cp 0.5
rs 10
cc 1
crap 1.125
1 1
import struct
2 1
import logging
3 1
import hashlib
4
5 1
import opcua.uaprotocol_auto as auto
6 1
import opcua.uatypes as uatypes
7 1
from opcua.uatypes import uatype_UInt32
8 1
import opcua.utils as utils
9 1
from opcua.object_ids import ObjectIds
10 1
from opcua.attribute_ids import AttributeIds
11
12 1
logger = logging.getLogger('opcua.uaprotocol')
13
14 1
OPC_TCP_SCHEME = 'opc.tcp'
15
16 1
class AccessLevelMask(object):
17
    """
18
    used by AccessLevel and UserAccessLevel
19
    """
20 1
    CurrentRead = 0
21 1
    CurrentWrite = 1
22 1
    HistoryRead = 2
23 1
    HistoryWrite = 3
24 1
    SemanticChange = 4
25
26
27 1
class Hello(uatypes.FrozenClass):
28
29 1
    def __init__(self):
30 1
        self.ProtocolVersion = 0
31 1
        self.ReceiveBufferSize = 65536
32 1
        self.SendBufferSize = 65536
33 1
        self.MaxMessageSize = 0
34 1
        self.MaxChunkCount = 0
35 1
        self.EndpointUrl = ""
36 1
        self._freeze = True
37
38 1
    def to_binary(self):
39 1
        b = []
40 1
        b.append(uatype_UInt32.pack(self.ProtocolVersion))
41 1
        b.append(uatype_UInt32.pack(self.ReceiveBufferSize))
42 1
        b.append(uatype_UInt32.pack(self.SendBufferSize))
43 1
        b.append(uatype_UInt32.pack(self.MaxMessageSize))
44 1
        b.append(uatype_UInt32.pack(self.MaxChunkCount))
45 1
        b.append(uatypes.pack_string(self.EndpointUrl))
46 1
        return b"".join(b)
47
48 1
    @staticmethod
49
    def from_binary(data):
50 1
        hello = Hello()
51 1
        hello.ProtocolVersion = uatype_UInt32.unpack(data.read(4))[0]
52 1
        hello.ReceiveBufferSize = uatype_UInt32.unpack(data.read(4))[0]
53 1
        hello.SendBufferSize = uatype_UInt32.unpack(data.read(4))[0]
54 1
        hello.MaxMessageSize = uatype_UInt32.unpack(data.read(4))[0]
55 1
        hello.MaxChunkCount = uatype_UInt32.unpack(data.read(4))[0]
56 1
        hello.EndpointUrl = uatypes.unpack_string(data)
57 1
        return hello
58
59
60 1
class MessageType(object):
61 1
    Invalid = b"INV"  # FIXME: check value
62 1
    Hello = b"HEL"
63 1
    Acknowledge = b"ACK"
64 1
    Error = b"ERR"
65 1
    SecureOpen = b"OPN"
66 1
    SecureClose = b"CLO"
67 1
    SecureMessage = b"MSG"
68
69
70 1
class ChunkType(object):
71 1
    Invalid = b"0"  # FIXME check
72 1
    Single = b"F"
73 1
    Intermediate = b"C"
74 1
    Abort = b"A"    # when an error occurred and the Message is aborted (body is ErrorMessage)
75
76
77 1
class Header(uatypes.FrozenClass):
78
79 1
    def __init__(self, msgType=None, chunkType=None, channelid=0):
80 1
        self.MessageType = msgType
81 1
        self.ChunkType = chunkType
82 1
        self.ChannelId = channelid
83 1
        self.body_size = 0
84 1
        self.packet_size = 0
85 1
        self._freeze = True
86
87 1
    def add_size(self, size):
88 1
        self.body_size += size
89
90 1
    def to_binary(self):
91 1
        b = []
92 1
        b.append(struct.pack("<3ss", self.MessageType, self.ChunkType))
93 1
        size = self.body_size + 8
94 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
95 1
            size += 4
96 1
        b.append(uatype_UInt32.pack(size))
97 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
98 1
            b.append(uatype_UInt32.pack(self.ChannelId))
99 1
        return b"".join(b)
100
101 1
    @staticmethod
102
    def from_string(data):
103 1
        hdr = Header()
104 1
        hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
105 1
        hdr.body_size = hdr.packet_size - 8
106 1
        if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
107 1
            hdr.body_size -= 4
108 1
            hdr.ChannelId = uatype_UInt32.unpack(data.read(4))[0]
109 1
        return hdr
110
111 1
    @staticmethod
112
    def max_size():
113 1
        return struct.calcsize("<3scII")
114
115 1
    def __str__(self):
116
        return "Header(type:{}, chunk_type:{}, body_size:{}, channel:{})".format(self.MessageType, self.ChunkType, self.body_size, self.ChannelId)
117 1
    __repr__ = __str__
118
119
120 1
class ErrorMessage(uatypes.FrozenClass):
121
122 1
    def __init__(self):
123
        self.Error = uatypes.StatusCode()
124
        self.Reason = ""
125
        self._freeze = True
126
127 1
    def to_binary(self):
128
        b = []
129
        b.append(self.Error.to_binary())
130
        b.append(uatypes.pack_string(self.Reason))
131
        return b"".join(b)
132
133 1
    @staticmethod
134
    def from_binary(data):
135
        ack = ErrorMessage()
136
        ack.Error = uatypes.StatusCode.from_binary(data)
137
        ack.Reason = uatypes.unpack_string(data)
138
        return ack
139
140 1
    def __str__(self):
141
        return "MessageAbort(error:{}, reason:{})".format(self.Error, self.Reason)
142 1
    __repr__ = __str__
143
144
145 1
class Acknowledge(uatypes.FrozenClass):
146
147 1
    def __init__(self):
148 1
        self.ProtocolVersion = 0
149 1
        self.ReceiveBufferSize = 65536
150 1
        self.SendBufferSize = 65536
151 1
        self.MaxMessageSize = 0  # No limits
152 1
        self.MaxChunkCount = 0  # No limits
153 1
        self._freeze = True
154
155 1
    def to_binary(self):
156 1
        return struct.pack(
157
            "<5I",
158
            self.ProtocolVersion,
159
            self.ReceiveBufferSize,
160
            self.SendBufferSize,
161
            self.MaxMessageSize,
162
            self.MaxChunkCount)
163
164 1
    @staticmethod
165
    def from_binary(data):
166 1
        ack = Acknowledge()
167 1
        ack.ProtocolVersion, ack.ReceiveBufferSize, ack.SendBufferSize, ack.MaxMessageSize, ack.MaxChunkCount \
168
            = struct.unpack("<5I", data.read(20))
169 1
        return ack
170
171
172 1
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
173
174 1
    def __init__(self):
175 1
        self.SecurityPolicyURI = "http://opcfoundation.org/UA/SecurityPolicy#None"
176 1
        self.SenderCertificate = b""
177 1
        self.ReceiverCertificateThumbPrint = b""
178 1
        self._freeze = True
179
180 1
    def to_binary(self):
181 1
        b = []
182 1
        b.append(uatypes.pack_string(self.SecurityPolicyURI))
183 1
        b.append(uatypes.pack_string(self.SenderCertificate))
184 1
        b.append(uatypes.pack_string(self.ReceiverCertificateThumbPrint))
185 1
        return b"".join(b)
186
187 1
    @staticmethod
188
    def from_binary(data):
189 1
        hdr = AsymmetricAlgorithmHeader()
190 1
        hdr.SecurityPolicyURI = uatypes.unpack_bytes(data)
191 1
        hdr.SenderCertificate = uatypes.unpack_bytes(data)
192 1
        hdr.ReceiverCertificateThumbPrint = uatypes.unpack_bytes(data)
193 1
        return hdr
194
195 1
    def __str__(self):
196
        return "{}(SecurytyPolicy:{}, certificatesize:{}, receiverCertificatesize:{} )".format(self.__class__.__name__, self.SecurityPolicyURI, len(self.SenderCertificate), len(self.ReceiverCertificateThumbPrint))
197 1
    __repr__ = __str__
198
199
200 1
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
201
202 1
    def __init__(self):
203 1
        self.TokenId = 0
204 1
        self._freeze = True
205
206 1
    @staticmethod
207
    def from_binary(data):
208 1
        obj = SymmetricAlgorithmHeader()
209 1
        obj.TokenId = uatype_UInt32.unpack(data.read(4))[0]
210 1
        return obj
211
212 1
    def to_binary(self):
213 1
        return uatype_UInt32.pack(self.TokenId)
214
215 1
    @staticmethod
216
    def max_size():
217 1
        return struct.calcsize("<I")
218
219 1
    def __str__(self):
220
        return "{}(TokenId:{} )".format(self.__class__.__name__, self.TokenId)
221 1
    __repr__ = __str__
222
223
224 1
class SequenceHeader(uatypes.FrozenClass):
225
226 1
    def __init__(self):
227 1
        self.SequenceNumber = None
228 1
        self.RequestId = None
229 1
        self._freeze = True
230
231 1
    @staticmethod
232
    def from_binary(data):
233 1
        obj = SequenceHeader()
234 1
        obj.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
235 1
        obj.RequestId = uatype_UInt32.unpack(data.read(4))[0]
236 1
        return obj
237
238 1
    def to_binary(self):
239 1
        b = []
240 1
        b.append(uatype_UInt32.pack(self.SequenceNumber))
241 1
        b.append(uatype_UInt32.pack(self.RequestId))
242 1
        return b"".join(b)
243
244 1
    @staticmethod
245
    def max_size():
246 1
        return struct.calcsize("<II")
247
248 1
    def __str__(self):
249
        return "{}(SequenceNumber:{}, RequestId:{} )".format(self.__class__.__name__, self.SequenceNumber, self.RequestId)
250 1
    __repr__ = __str__
251
252
253 1
class CryptographyNone:
254
    """
255
    Base class for symmetric/asymmetric cryprography
256
    """
257 1
    def __init__(self, mode=auto.MessageSecurityMode.None_):
258 1
        pass
259
260 1
    def plain_block_size(self):
261
        """
262
        Size of plain text block for block cipher.
263
        """
264 1
        return 1
265
266 1
    def encrypted_block_size(self):
267
        """
268
        Size of encrypted text block for block cipher.
269
        """
270 1
        return 1
271
272 1
    def padding(self, size):
273
        """
274
        Create padding for a block of given size.
275
        plain_size = size + len(padding) + signature_size()
276
        plain_size = N * plain_block_size()
277
        """
278 1
        return b''
279
280 1
    def min_padding_size(self):
281 1
        return 0
282
283 1
    def signature_size(self):
284 1
        return 0
285
286 1
    def signature(self, data):
287 1
        return b''
288
289 1
    def encrypt(self, data):
290 1
        return data
291
292 1
    def decrypt(self, data):
293 1
        return data
294
295 1
    def vsignature_size(self):
296 1
        return 0
297
298 1
    def verify(self, data, signature):
299
        """
300
        Verify signature and raise exception if signature is invalid
301
        """
302
        pass
303
304 1
    def remove_padding(self, data):
305 1
        return data
306
307
308 1
class SecurityPolicy:
309
    """
310
    Base class for security policy
311
    """
312 1
    def __init__(self):
313 1
        self.asymmetric_cryptography = CryptographyNone()
314 1
        self.symmetric_cryptography = CryptographyNone()
315 1
        self.Mode = auto.MessageSecurityMode.None_
316 1
        self.URI = "http://opcfoundation.org/UA/SecurityPolicy#None"
317 1
        self.server_certificate = b""
318 1
        self.client_certificate = b""
319
320 1
    def symmetric_key_size(self):
321
        return 0
322
323 1
    def make_symmetric_key(self, a, b):
324
        pass
325
326
327 1
class MessageChunk(uatypes.FrozenClass):
328
    """
329
    Message Chunk, as described in OPC UA specs Part 6, 6.7.2.
330
    """
331 1
    def __init__(self, security_policy, body=b'', msg_type=MessageType.SecureMessage, chunk_type=ChunkType.Single):
332 1
        self.MessageHeader = Header(msg_type, chunk_type)
333 1
        if msg_type in (MessageType.SecureMessage, MessageType.SecureClose):
334 1
            self.SecurityHeader = SymmetricAlgorithmHeader()
335 1
        elif msg_type == MessageType.SecureOpen:
336 1
            self.SecurityHeader = AsymmetricAlgorithmHeader()
337
        else:
338
            raise Exception("Unsupported message type: {}".format(msg_type))
339 1
        self.SequenceHeader = SequenceHeader()
340 1
        self.Body = body
341 1
        self._security_policy = security_policy
342
343 1
    @staticmethod
344
    def from_binary(security_policy, data):
345 1
        h = Header.from_string(data)
346 1
        return MessageChunk.from_header_and_body(security_policy, h, data)
347
348 1
    @staticmethod
349
    def from_header_and_body(security_policy, header, data):
350 1
        if header.MessageType in (MessageType.SecureMessage, MessageType.SecureClose):
351 1
            security_header = SymmetricAlgorithmHeader.from_binary(data)
352 1
            crypto = security_policy.symmetric_cryptography
353
        elif header.MessageType == MessageType.SecureOpen:
354
            security_header = AsymmetricAlgorithmHeader.from_binary(data)
355
            crypto = security_policy.asymmetric_cryptography
356
        else:
357
            raise Exception("Unsupported message type: {}".format(header.MessageType))
358 1
        obj = MessageChunk(crypto)
359 1
        obj.MessageHeader = header
360 1
        obj.SecurityHeader = security_header
361 1
        decrypted = crypto.decrypt(data.read(len(data)))
362 1
        signature_size = crypto.vsignature_size()
363 1
        if signature_size > 0:
364
            signature = decrypted[-signature_size:]
365
            decrypted = decrypted[:-signature_size]
366
            crypto.verify(obj.MessageHeader.to_binary() + obj.SecurityHeader.to_binary() + decrypted, signature)
367 1
        data = utils.Buffer(crypto.remove_padding(decrypted))
368 1
        obj.SequenceHeader = SequenceHeader.from_binary(data)
369 1
        obj.Body = data.read(len(data))
370 1
        return obj
371
372 1
    def encrypted_size(self, plain_size):
373 1
        size = plain_size + self._security_policy.signature_size()
374 1
        pbs = self._security_policy.plain_block_size()
375 1
        assert(size % pbs == 0)
376 1
        return size // pbs * self._security_policy.encrypted_block_size()
377
378 1
    def to_binary(self):
379 1
        security = self.SecurityHeader.to_binary()
380 1
        encrypted_part = self.SequenceHeader.to_binary() + self.Body
381 1
        encrypted_part += self._security_policy.padding(len(encrypted_part))
382 1
        self.MessageHeader.body_size = len(security) + self.encrypted_size(len(encrypted_part))
383 1
        header = self.MessageHeader.to_binary()
384 1
        encrypted_part += self._security_policy.signature(header + security + encrypted_part)
385 1
        return header + security + self._security_policy.encrypt(encrypted_part)
386
387 1
    @staticmethod
388
    def max_body_size(crypto, max_chunk_size):
389 1
        max_encrypted_size = max_chunk_size - Header.max_size() - SymmetricAlgorithmHeader.max_size()
390 1
        max_plain_size = (max_encrypted_size // crypto.encrypted_block_size()) * crypto.plain_block_size()
391 1
        return max_plain_size - SequenceHeader.max_size() - crypto.signature_size() - crypto.min_padding_size()
392
393 1
    @staticmethod
394 1
    def message_to_chunks(security_policy, body, max_chunk_size, message_type=MessageType.SecureMessage, channel_id=1, request_id=1, token_id=1):
395
        """
396
        Pack message body (as binary string) into one or more chunks.
397
        Size of each chunk will not exceed max_chunk_size.
398
        Returns a list of MessageChunks. SequenceNumber is not initialized here,
399
        it must be set by Secure Channel driver.
400
        """
401 1
        if message_type == MessageType.SecureOpen:
402
            # SecureOpen message must be in a single chunk (specs, Part 6, 6.7.2)
403 1
            chunk = MessageChunk(security_policy.asymmetric_cryptography, body, message_type, ChunkType.Single)
404 1
            chunk.SecurityHeader.SecurityPolicyURI = security_policy.URI
405 1
            if security_policy.client_certificate:
406
                chunk.SecurityHeader.SenderCertificate = security_policy.client_certificate
407 1
            if security_policy.server_certificate:
408
                chunk.SecurityHeader.ReceiverCertificateThumbPrint = hashlib.sha1(security_policy.server_certificate).digest()
409 1
            chunk.MessageHeader.ChannelId = channel_id
410 1
            chunk.SequenceHeader.RequestId = request_id
411 1
            return [chunk]
412
413 1
        crypto = security_policy.symmetric_cryptography
414 1
        max_size = MessageChunk.max_body_size(crypto, max_chunk_size)
415
416 1
        chunks = []
417 1
        for i in range(0, len(body), max_size):
418 1
            part = body[i:i+max_size]
419 1
            if i+max_size >= len(body):
420 1
                chunk_type = ChunkType.Single
421
            else:
422 1
                chunk_type = ChunkType.Intermediate
423 1
            chunk = MessageChunk(crypto, part, message_type, chunk_type)
424 1
            chunk.SecurityHeader.TokenId = token_id
425 1
            chunk.MessageHeader.ChannelId = channel_id
426 1
            chunk.SequenceHeader.RequestId = request_id
427 1
            chunks.append(chunk)
428 1
        return chunks
429
430 1
    def __str__(self):
431
        return "{}({}, {}, {}, {} bytes)".format(self.__class__.__name__,
432
                self.MessageHeader, self.SequenceHeader, self.SecurityHeader, len(self.Body))
433 1
    __repr__ = __str__
434
435
436
# FIXES for missing switchfield in NodeAttributes classes
437 1
ana = auto.NodeAttributesMask
438
439
440 1
class ObjectAttributes(auto.ObjectAttributes):
441
442 1
    def __init__(self):
443 1
        auto.ObjectAttributes.__init__(self)
444 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
445
446
447 1
class ObjectTypeAttributes(auto.ObjectTypeAttributes):
448
449 1
    def __init__(self):
450 1
        auto.ObjectTypeAttributes.__init__(self)
451 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
452
453
454 1
class VariableAttributes(auto.VariableAttributes):
455
456 1
    def __init__(self):
457 1
        auto.VariableAttributes.__init__(self)
458 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
459 1
        self.Historizing = False
460
461
462 1
class VariableTypeAttributes(auto.VariableTypeAttributes):
463
464 1
    def __init__(self):
465 1
        auto.VariableTypeAttributes.__init__(self)
466 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
467
468
469 1
class MethodAttributes(auto.MethodAttributes):
470
471 1
    def __init__(self):
472 1
        auto.MethodAttributes.__init__(self)
473 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
474
475
476 1
class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
477
478 1
    def __init__(self):
479 1
        auto.ReferenceTypeAttributes.__init__(self)
480 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
481
482
483 1
class DataTypeAttributes(auto.DataTypeAttributes):
484
485 1
    def __init__(self):
486 1
        auto.DataTypeAttributes.__init__(self)
487 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
488
489
490 1
class ViewAttributes(auto.ViewAttributes):
491
492 1
    def __init__(self):
493
        auto.ViewAttributes.__init__(self)
494
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
495
496
497 1
class Argument(auto.Argument):
498
499 1
    def __init__(self):
500 1
        auto.Argument.__init__(self)
501 1
        self.ValueRank = -2
502
503
504
AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
505