Passed
Push — dev ( 454bf6...555c2d )
by Olivier
04:48 queued 02:26
created

opcua.VariableAttributes.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1
Metric Value
dl 0
loc 4
ccs 4
cts 4
cp 1
rs 10
cc 1
crap 1
1 1
import struct
2 1
import logging
3 1
import hashlib
4 1
from enum import IntEnum
5
6 1
import opcua.uaprotocol_auto as auto
7 1
import opcua.uatypes as uatypes
8 1
from opcua.uatypes import uatype_UInt32
9 1
import opcua.utils as utils
10 1
from opcua.object_ids import ObjectIds
11 1
from opcua.attribute_ids import AttributeIds
12
13 1
logger = logging.getLogger('opcua.uaprotocol')
14
15 1
OPC_TCP_SCHEME = 'opc.tcp'
16
17
18 1
class ValueRank(IntEnum):
19
    """
20
    Defines dimensions of a variable. 
21
    This enum does not support all cases since ValueRank support any n>0
22
    but since it is an IntEnum it can be replace by a normal int
23
    """
24 1
    ScalarOrOneDimension = -3
25 1
    Any = -2
26 1
    Scalar = -1
27 1
    OneOrMoreDimensions = 0
28 1
    OneDimension = 1
29
    # the next names are not in spec but so common we express them here
30 1
    TwoDimensions = 2
31 1
    ThreeDimensions = 3
32 1
    FourDimensions = 3
33
34
35 1
class AccessLevelMask(object):
36
    """
37
    Used by AccessLevel and UserAccessLevel
38
    """
39 1
    CurrentRead = 0
40 1
    CurrentWrite = 1
41 1
    HistoryRead = 2
42 1
    HistoryWrite = 3
43 1
    SemanticChange = 4
44
45
46 1
class Hello(uatypes.FrozenClass):
47
48 1
    def __init__(self):
49 1
        self.ProtocolVersion = 0
50 1
        self.ReceiveBufferSize = 65536
51 1
        self.SendBufferSize = 65536
52 1
        self.MaxMessageSize = 0
53 1
        self.MaxChunkCount = 0
54 1
        self.EndpointUrl = ""
55 1
        self._freeze = True
56
57 1
    def to_binary(self):
58 1
        b = []
59 1
        b.append(uatype_UInt32.pack(self.ProtocolVersion))
60 1
        b.append(uatype_UInt32.pack(self.ReceiveBufferSize))
61 1
        b.append(uatype_UInt32.pack(self.SendBufferSize))
62 1
        b.append(uatype_UInt32.pack(self.MaxMessageSize))
63 1
        b.append(uatype_UInt32.pack(self.MaxChunkCount))
64 1
        b.append(uatypes.pack_string(self.EndpointUrl))
65 1
        return b"".join(b)
66
67 1
    @staticmethod
68
    def from_binary(data):
69 1
        hello = Hello()
70 1
        hello.ProtocolVersion = uatype_UInt32.unpack(data.read(4))[0]
71 1
        hello.ReceiveBufferSize = uatype_UInt32.unpack(data.read(4))[0]
72 1
        hello.SendBufferSize = uatype_UInt32.unpack(data.read(4))[0]
73 1
        hello.MaxMessageSize = uatype_UInt32.unpack(data.read(4))[0]
74 1
        hello.MaxChunkCount = uatype_UInt32.unpack(data.read(4))[0]
75 1
        hello.EndpointUrl = uatypes.unpack_string(data)
76 1
        return hello
77
78
79 1
class MessageType(object):
80 1
    Invalid = b"INV"  # FIXME: check value
81 1
    Hello = b"HEL"
82 1
    Acknowledge = b"ACK"
83 1
    Error = b"ERR"
84 1
    SecureOpen = b"OPN"
85 1
    SecureClose = b"CLO"
86 1
    SecureMessage = b"MSG"
87
88
89 1
class ChunkType(object):
90 1
    Invalid = b"0"  # FIXME check
91 1
    Single = b"F"
92 1
    Intermediate = b"C"
93 1
    Abort = b"A"    # when an error occurred and the Message is aborted (body is ErrorMessage)
94
95
96 1
class Header(uatypes.FrozenClass):
97
98 1
    def __init__(self, msgType=None, chunkType=None, channelid=0):
99 1
        self.MessageType = msgType
100 1
        self.ChunkType = chunkType
101 1
        self.ChannelId = channelid
102 1
        self.body_size = 0
103 1
        self.packet_size = 0
104 1
        self._freeze = True
105
106 1
    def add_size(self, size):
107 1
        self.body_size += size
108
109 1
    def to_binary(self):
110 1
        b = []
111 1
        b.append(struct.pack("<3ss", self.MessageType, self.ChunkType))
112 1
        size = self.body_size + 8
113 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
114 1
            size += 4
115 1
        b.append(uatype_UInt32.pack(size))
116 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
117 1
            b.append(uatype_UInt32.pack(self.ChannelId))
118 1
        return b"".join(b)
119
120 1
    @staticmethod
121
    def from_string(data):
122 1
        hdr = Header()
123 1
        hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
124 1
        hdr.body_size = hdr.packet_size - 8
125 1
        if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
126 1
            hdr.body_size -= 4
127 1
            hdr.ChannelId = uatype_UInt32.unpack(data.read(4))[0]
128 1
        return hdr
129
130 1
    @staticmethod
131
    def max_size():
132 1
        return struct.calcsize("<3scII")
133
134 1
    def __str__(self):
135
        return "Header(type:{}, chunk_type:{}, body_size:{}, channel:{})".format(self.MessageType, self.ChunkType, self.body_size, self.ChannelId)
136 1
    __repr__ = __str__
137
138
139 1
class ErrorMessage(uatypes.FrozenClass):
140
141 1
    def __init__(self):
142
        self.Error = uatypes.StatusCode()
143
        self.Reason = ""
144
        self._freeze = True
145
146 1
    def to_binary(self):
147
        b = []
148
        b.append(self.Error.to_binary())
149
        b.append(uatypes.pack_string(self.Reason))
150
        return b"".join(b)
151
152 1
    @staticmethod
153
    def from_binary(data):
154
        ack = ErrorMessage()
155
        ack.Error = uatypes.StatusCode.from_binary(data)
156
        ack.Reason = uatypes.unpack_string(data)
157
        return ack
158
159 1
    def __str__(self):
160
        return "MessageAbort(error:{}, reason:{})".format(self.Error, self.Reason)
161 1
    __repr__ = __str__
162
163
164 1
class Acknowledge(uatypes.FrozenClass):
165
166 1
    def __init__(self):
167 1
        self.ProtocolVersion = 0
168 1
        self.ReceiveBufferSize = 65536
169 1
        self.SendBufferSize = 65536
170 1
        self.MaxMessageSize = 0  # No limits
171 1
        self.MaxChunkCount = 0  # No limits
172 1
        self._freeze = True
173
174 1
    def to_binary(self):
175 1
        return struct.pack(
176
            "<5I",
177
            self.ProtocolVersion,
178
            self.ReceiveBufferSize,
179
            self.SendBufferSize,
180
            self.MaxMessageSize,
181
            self.MaxChunkCount)
182
183 1
    @staticmethod
184
    def from_binary(data):
185 1
        ack = Acknowledge()
186 1
        ack.ProtocolVersion, ack.ReceiveBufferSize, ack.SendBufferSize, ack.MaxMessageSize, ack.MaxChunkCount \
187
            = struct.unpack("<5I", data.read(20))
188 1
        return ack
189
190
191 1
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
192
193 1
    def __init__(self):
194 1
        self.SecurityPolicyURI = "http://opcfoundation.org/UA/SecurityPolicy#None"
195 1
        self.SenderCertificate = b""
196 1
        self.ReceiverCertificateThumbPrint = b""
197 1
        self._freeze = True
198
199 1
    def to_binary(self):
200 1
        b = []
201 1
        b.append(uatypes.pack_string(self.SecurityPolicyURI))
202 1
        b.append(uatypes.pack_string(self.SenderCertificate))
203 1
        b.append(uatypes.pack_string(self.ReceiverCertificateThumbPrint))
204 1
        return b"".join(b)
205
206 1
    @staticmethod
207
    def from_binary(data):
208 1
        hdr = AsymmetricAlgorithmHeader()
209 1
        hdr.SecurityPolicyURI = uatypes.unpack_bytes(data)
210 1
        hdr.SenderCertificate = uatypes.unpack_bytes(data)
211 1
        hdr.ReceiverCertificateThumbPrint = uatypes.unpack_bytes(data)
212 1
        return hdr
213
214 1
    def __str__(self):
215
        return "{}(SecurytyPolicy:{}, certificatesize:{}, receiverCertificatesize:{} )".format(self.__class__.__name__, self.SecurityPolicyURI, len(self.SenderCertificate), len(self.ReceiverCertificateThumbPrint))
216 1
    __repr__ = __str__
217
218
219 1
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
220
221 1
    def __init__(self):
222 1
        self.TokenId = 0
223 1
        self._freeze = True
224
225 1
    @staticmethod
226
    def from_binary(data):
227 1
        obj = SymmetricAlgorithmHeader()
228 1
        obj.TokenId = uatype_UInt32.unpack(data.read(4))[0]
229 1
        return obj
230
231 1
    def to_binary(self):
232 1
        return uatype_UInt32.pack(self.TokenId)
233
234 1
    @staticmethod
235
    def max_size():
236 1
        return struct.calcsize("<I")
237
238 1
    def __str__(self):
239
        return "{}(TokenId:{} )".format(self.__class__.__name__, self.TokenId)
240 1
    __repr__ = __str__
241
242
243 1
class SequenceHeader(uatypes.FrozenClass):
244
245 1
    def __init__(self):
246 1
        self.SequenceNumber = None
247 1
        self.RequestId = None
248 1
        self._freeze = True
249
250 1
    @staticmethod
251
    def from_binary(data):
252 1
        obj = SequenceHeader()
253 1
        obj.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
254 1
        obj.RequestId = uatype_UInt32.unpack(data.read(4))[0]
255 1
        return obj
256
257 1
    def to_binary(self):
258 1
        b = []
259 1
        b.append(uatype_UInt32.pack(self.SequenceNumber))
260 1
        b.append(uatype_UInt32.pack(self.RequestId))
261 1
        return b"".join(b)
262
263 1
    @staticmethod
264
    def max_size():
265 1
        return struct.calcsize("<II")
266
267 1
    def __str__(self):
268
        return "{}(SequenceNumber:{}, RequestId:{} )".format(self.__class__.__name__, self.SequenceNumber, self.RequestId)
269 1
    __repr__ = __str__
270
271
272 1
class CryptographyNone:
273
    """
274
    Base class for symmetric/asymmetric cryprography
275
    """
276 1
    def __init__(self):
277 1
        pass
278
279 1
    def plain_block_size(self):
280
        """
281
        Size of plain text block for block cipher.
282
        """
283 1
        return 1
284
285 1
    def encrypted_block_size(self):
286
        """
287
        Size of encrypted text block for block cipher.
288
        """
289 1
        return 1
290
291 1
    def padding(self, size):
292
        """
293
        Create padding for a block of given size.
294
        plain_size = size + len(padding) + signature_size()
295
        plain_size = N * plain_block_size()
296
        """
297 1
        return b''
298
299 1
    def min_padding_size(self):
300 1
        return 0
301
302 1
    def signature_size(self):
303 1
        return 0
304
305 1
    def signature(self, data):
306 1
        return b''
307
308 1
    def encrypt(self, data):
309 1
        return data
310
311 1
    def decrypt(self, data):
312 1
        return data
313
314 1
    def vsignature_size(self):
315 1
        return 0
316
317 1
    def verify(self, data, signature):
318
        """
319
        Verify signature and raise exception if signature is invalid
320
        """
321 1
        pass
322
323 1
    def remove_padding(self, data):
324 1
        return data
325
326
327 1
class SecurityPolicy:
328
    """
329
    Base class for security policy
330
    """
331 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#None"
332 1
    signature_key_size = 0
333 1
    symmetric_key_size = 0
334
335 1
    def __init__(self):
336 1
        self.asymmetric_cryptography = CryptographyNone()
337 1
        self.symmetric_cryptography = CryptographyNone()
338 1
        self.Mode = auto.MessageSecurityMode.None_
339 1
        self.server_certificate = b""
340 1
        self.client_certificate = b""
341
342 1
    def make_symmetric_key(self, a, b):
343 1
        pass
344
345
346 1
class MessageChunk(uatypes.FrozenClass):
347
    """
348
    Message Chunk, as described in OPC UA specs Part 6, 6.7.2.
349
    """
350 1
    def __init__(self, security_policy, body=b'', msg_type=MessageType.SecureMessage, chunk_type=ChunkType.Single):
351 1
        self.MessageHeader = Header(msg_type, chunk_type)
352 1
        if msg_type in (MessageType.SecureMessage, MessageType.SecureClose):
353 1
            self.SecurityHeader = SymmetricAlgorithmHeader()
354 1
        elif msg_type == MessageType.SecureOpen:
355 1
            self.SecurityHeader = AsymmetricAlgorithmHeader()
356
        else:
357
            raise Exception("Unsupported message type: {}".format(msg_type))
358 1
        self.SequenceHeader = SequenceHeader()
359 1
        self.Body = body
360 1
        self._security_policy = security_policy
361
362 1
    @staticmethod
363
    def from_binary(security_policy, data):
364 1
        h = Header.from_string(data)
365 1
        return MessageChunk.from_header_and_body(security_policy, h, data)
366
367 1
    @staticmethod
368
    def from_header_and_body(security_policy, header, data):
369 1
        if header.MessageType in (MessageType.SecureMessage, MessageType.SecureClose):
370 1
            security_header = SymmetricAlgorithmHeader.from_binary(data)
371 1
            crypto = security_policy.symmetric_cryptography
372 1
        elif header.MessageType == MessageType.SecureOpen:
373 1
            security_header = AsymmetricAlgorithmHeader.from_binary(data)
374 1
            crypto = security_policy.asymmetric_cryptography
375
        else:
376
            raise Exception("Unsupported message type: {}".format(header.MessageType))
377 1
        obj = MessageChunk(crypto)
378 1
        obj.MessageHeader = header
379 1
        obj.SecurityHeader = security_header
380 1
        decrypted = crypto.decrypt(data.read(len(data)))
381 1
        signature_size = crypto.vsignature_size()
382 1
        if signature_size > 0:
383
            signature = decrypted[-signature_size:]
384
            decrypted = decrypted[:-signature_size]
385
            crypto.verify(obj.MessageHeader.to_binary() + obj.SecurityHeader.to_binary() + decrypted, signature)
386 1
        data = utils.Buffer(crypto.remove_padding(decrypted))
387 1
        obj.SequenceHeader = SequenceHeader.from_binary(data)
388 1
        obj.Body = data.read(len(data))
389 1
        return obj
390
391 1
    def encrypted_size(self, plain_size):
392 1
        size = plain_size + self._security_policy.signature_size()
393 1
        pbs = self._security_policy.plain_block_size()
394 1
        assert(size % pbs == 0)
395 1
        return size // pbs * self._security_policy.encrypted_block_size()
396
397 1
    def to_binary(self):
398 1
        security = self.SecurityHeader.to_binary()
399 1
        encrypted_part = self.SequenceHeader.to_binary() + self.Body
400 1
        encrypted_part += self._security_policy.padding(len(encrypted_part))
401 1
        self.MessageHeader.body_size = len(security) + self.encrypted_size(len(encrypted_part))
402 1
        header = self.MessageHeader.to_binary()
403 1
        encrypted_part += self._security_policy.signature(header + security + encrypted_part)
404 1
        return header + security + self._security_policy.encrypt(encrypted_part)
405
406 1
    @staticmethod
407
    def max_body_size(crypto, max_chunk_size):
408 1
        max_encrypted_size = max_chunk_size - Header.max_size() - SymmetricAlgorithmHeader.max_size()
409 1
        max_plain_size = (max_encrypted_size // crypto.encrypted_block_size()) * crypto.plain_block_size()
410 1
        return max_plain_size - SequenceHeader.max_size() - crypto.signature_size() - crypto.min_padding_size()
411
412 1
    @staticmethod
413 1
    def message_to_chunks(security_policy, body, max_chunk_size, message_type=MessageType.SecureMessage, channel_id=1, request_id=1, token_id=1):
414
        """
415
        Pack message body (as binary string) into one or more chunks.
416
        Size of each chunk will not exceed max_chunk_size.
417
        Returns a list of MessageChunks. SequenceNumber is not initialized here,
418
        it must be set by Secure Channel driver.
419
        """
420 1
        if message_type == MessageType.SecureOpen:
421
            # SecureOpen message must be in a single chunk (specs, Part 6, 6.7.2)
422 1
            chunk = MessageChunk(security_policy.asymmetric_cryptography, body, message_type, ChunkType.Single)
423 1
            chunk.SecurityHeader.SecurityPolicyURI = security_policy.URI
424 1
            if security_policy.client_certificate:
425
                chunk.SecurityHeader.SenderCertificate = security_policy.client_certificate
426 1
            if security_policy.server_certificate:
427
                chunk.SecurityHeader.ReceiverCertificateThumbPrint = hashlib.sha1(security_policy.server_certificate).digest()
428 1
            chunk.MessageHeader.ChannelId = channel_id
429 1
            chunk.SequenceHeader.RequestId = request_id
430 1
            return [chunk]
431
432 1
        crypto = security_policy.symmetric_cryptography
433 1
        max_size = MessageChunk.max_body_size(crypto, max_chunk_size)
434
435 1
        chunks = []
436 1
        for i in range(0, len(body), max_size):
437 1
            part = body[i:i+max_size]
438 1
            if i+max_size >= len(body):
439 1
                chunk_type = ChunkType.Single
440
            else:
441 1
                chunk_type = ChunkType.Intermediate
442 1
            chunk = MessageChunk(crypto, part, message_type, chunk_type)
443 1
            chunk.SecurityHeader.TokenId = token_id
444 1
            chunk.MessageHeader.ChannelId = channel_id
445 1
            chunk.SequenceHeader.RequestId = request_id
446 1
            chunks.append(chunk)
447 1
        return chunks
448
449 1
    def __str__(self):
450
        return "{}({}, {}, {}, {} bytes)".format(self.__class__.__name__,
451
                self.MessageHeader, self.SequenceHeader, self.SecurityHeader, len(self.Body))
452 1
    __repr__ = __str__
453
454
455 1
def tcp_message_from_header_and_body(security_policy, header, body):
456
    """
457
    Convert binary stream to OPC UA TCP message (see OPC UA specs Part 6, 7.1)
458
    The only supported message types are Hello, Acknowledge and Error
459
    """
460 1
    if header.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
461 1
        return MessageChunk.from_header_and_body(security_policy, header, body)
462 1
    elif header.MessageType == MessageType.Hello:
463
        return Hello.from_binary(body)
464 1
    elif header.MessageType == MessageType.Acknowledge:
465 1
        return Acknowledge.from_binary(body)
466
    elif header.MessageType == MessageType.Error:
467
        return ErrorMessage.from_binary(body)
468
    else:
469
        raise Exception("Unsupported message type {}".format(header.MessageType))
470
471
472 1
def tcp_message_from_socket(security_policy, socket):
473 1
    logger.debug("Waiting for header")
474 1
    header = Header.from_string(socket)
475 1
    logger.info("received header: %s", header)
476 1
    body = socket.read(header.body_size)
477 1
    if len(body) != header.body_size:
478
        raise Exception("{} bytes expected, {} available".format(header.body_size, len(body)))
479 1
    return tcp_message_from_header_and_body(security_policy, header, utils.Buffer(body))
480
481
482
# FIXES for missing switchfield in NodeAttributes classes
483 1
ana = auto.NodeAttributesMask
484
485
486 1
class ObjectAttributes(auto.ObjectAttributes):
487
488 1
    def __init__(self):
489 1
        auto.ObjectAttributes.__init__(self)
490 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
491
492
493 1
class ObjectTypeAttributes(auto.ObjectTypeAttributes):
494
495 1
    def __init__(self):
496 1
        auto.ObjectTypeAttributes.__init__(self)
497 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
498
499
500 1
class VariableAttributes(auto.VariableAttributes):
501
502 1
    def __init__(self):
503 1
        auto.VariableAttributes.__init__(self)
504 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
505 1
        self.Historizing = False
506
507
508 1
class VariableTypeAttributes(auto.VariableTypeAttributes):
509
510 1
    def __init__(self):
511 1
        auto.VariableTypeAttributes.__init__(self)
512 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
513
514
515 1
class MethodAttributes(auto.MethodAttributes):
516
517 1
    def __init__(self):
518 1
        auto.MethodAttributes.__init__(self)
519 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
520
521
522 1
class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
523
524 1
    def __init__(self):
525 1
        auto.ReferenceTypeAttributes.__init__(self)
526 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
527
528
529 1
class DataTypeAttributes(auto.DataTypeAttributes):
530
531 1
    def __init__(self):
532 1
        auto.DataTypeAttributes.__init__(self)
533 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
534
535
536 1
class ViewAttributes(auto.ViewAttributes):
537
538 1
    def __init__(self):
539
        auto.ViewAttributes.__init__(self)
540
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
541
542
543 1
class Argument(auto.Argument):
544
545 1
    def __init__(self):
546 1
        auto.Argument.__init__(self)
547 1
        self.ValueRank = -2
548
549
550
#AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
551