Passed
Push — master ( 4a3463...98fd52 )
by Olivier
02:51 queued 10s
created

SecurityPolicy.make_remote_symmetric_key()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import struct
2
3
from asyncua.ua import uaprotocol_auto as auto
4
from asyncua.ua import uatypes
5
from asyncua.common import utils
6
from asyncua.ua.uatypes import AccessLevel, FrozenClass
7
8
OPC_TCP_SCHEME = 'opc.tcp'
9
10
11
class Hello(uatypes.FrozenClass):
12
    ua_types = (
13
        ('ProtocolVersion', 'UInt32'), ('ReceiveBufferSize', 'UInt32'), ('SendBufferSize', 'UInt32'),
14
        ('MaxMessageSize', 'UInt32'), ('MaxChunkCount', 'UInt32'), ('EndpointUrl', 'String'),
15
    )
16
17
    def __init__(self):
18
        self.ProtocolVersion = 0
19
        self.ReceiveBufferSize = 65536
20
        self.SendBufferSize = 65536
21
        self.MaxMessageSize = 0  # No limits
22
        self.MaxChunkCount = 0  # No limits
23
        self.EndpointUrl = ""
24
        self._freeze = True
25
26
27
class MessageType:
28
    Invalid = b'INV'  # FIXME: check value
29
    Hello = b'HEL'
30
    Acknowledge = b'ACK'
31
    Error = b'ERR'
32
    SecureOpen = b'OPN'
33
    SecureClose = b'CLO'
34
    SecureMessage = b'MSG'
35
36
37
class ChunkType:
38
    Invalid = b'0'  # FIXME check
39
    Single = b'F'
40
    Intermediate = b'C'
41
    Abort = b'A'  # when an error occurred and the Message is aborted (body is ErrorMessage)
42
43
44
class Header(uatypes.FrozenClass):
45
    def __init__(self, msgType=None, chunkType=None, channelid=0):
46
        self.MessageType = msgType
47
        self.ChunkType = chunkType
48
        self.ChannelId = channelid
49
        self.body_size = 0
50
        self.packet_size = 0
51
        self.header_size = 8
52
        self._freeze = True
53
54
    def add_size(self, size):
55
        self.body_size += size
56
57
    @staticmethod
58
    def max_size():
59
        return struct.calcsize("<3scII")
60
61
    def __str__(self):
62
        return f'Header(type:{self.MessageType}, chunk_type:{self.ChunkType}, body_size:{self.body_size}, channel:{self.ChannelId})'
63
64
    __repr__ = __str__
65
66
67
class ErrorMessage(uatypes.FrozenClass):
68
    ua_types = (('Error', 'StatusCode'), ('Reason', 'String'),)
69
70
    def __init__(self):
71
        self.Error = uatypes.StatusCode()
72
        self.Reason = ""
73
        self._freeze = True
74
75
    def __str__(self):
76
        return f'MessageAbort(error:{self.Error}, reason:{self.Reason})'
77
78
    __repr__ = __str__
79
80
81
class Acknowledge(uatypes.FrozenClass):
82
    ua_types = [
83
        ('ProtocolVersion', 'UInt32'),
84
        ('ReceiveBufferSize', 'UInt32'),
85
        ('SendBufferSize', 'UInt32'),
86
        ('MaxMessageSize', 'UInt32'),
87
        ('MaxChunkCount', 'UInt32'),
88
    ]
89
90
    def __init__(self):
91
        self.ProtocolVersion = 0
92
        self.ReceiveBufferSize = 65536
93
        self.SendBufferSize = 65536
94
        self.MaxMessageSize = 0  # No limits
95
        self.MaxChunkCount = 0  # No limits
96
        self._freeze = True
97
98
99
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
100
    ua_types = [
101
        ('SecurityPolicyURI', 'String'),
102
        ('SenderCertificate', 'ByteString'),
103
        ('ReceiverCertificateThumbPrint', 'ByteString'),
104
    ]
105
106
    def __init__(self):
107
        self.SecurityPolicyURI = 'http://opcfoundation.org/UA/SecurityPolicy#None'
108
        self.SenderCertificate = None
109
        self.ReceiverCertificateThumbPrint = None
110
        self._freeze = True
111
112
    def __str__(self):
113
        size1 = len(self.SenderCertificate) if self.SenderCertificate is not None else None
114
        size2 = len(self.ReceiverCertificateThumbPrint) if self.ReceiverCertificateThumbPrint is not None else None
115
        return f'{self.__class__.__name__}(SecurityPolicy:{self.SecurityPolicyURI}, certificatesize:{size2}, receiverCertificatesize:{size2} )'
116
117
    __repr__ = __str__
118
119
120
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
121
    ua_types = [
122
        ('TokenId', 'UInt32'),
123
    ]
124
125
    def __init__(self):
126
        self.TokenId = 0
127
        self._freeze = True
128
129
    @staticmethod
130
    def max_size():
131
        return struct.calcsize('<I')
132
133
    def __str__(self):
134
        return f'{self.__class__.__name__}(TokenId:{self.TokenId} )'
135
136
    __repr__ = __str__
137
138
139
class SequenceHeader(uatypes.FrozenClass):
140
    ua_types = [
141
        ('SequenceNumber', 'UInt32'),
142
        ('RequestId', 'UInt32'),
143
    ]
144
145
    def __init__(self):
146
        self.SequenceNumber = None
147
        self.RequestId = None
148
        self._freeze = True
149
150
    @staticmethod
151
    def max_size():
152
        return struct.calcsize('<II')
153
154
    def __str__(self):
155
        return f'{self.__class__.__name__}(SequenceNumber:{self.SequenceNumber}, RequestId:{self.RequestId} )'
156
157
    __repr__ = __str__
158
159
160
class CryptographyNone:
161
    """
162
    Base class for symmetric/asymmetric cryprography
163
    """
164
165
    def __init__(self):
166
        pass
167
168
    def plain_block_size(self):
169
        """
170
        Size of plain text block for block cipher.
171
        """
172
        return 1
173
174
    def encrypted_block_size(self):
175
        """
176
        Size of encrypted text block for block cipher.
177
        """
178
        return 1
179
180
    def padding(self, size):
181
        """
182
        Create padding for a block of given size.
183
        plain_size = size + len(padding) + signature_size()
184
        plain_size = N * plain_block_size()
185
        """
186
        return b''
187
188
    def min_padding_size(self):
189
        return 0
190
191
    def signature_size(self):
192
        return 0
193
194
    def signature(self, data):
195
        return b''
196
197
    def encrypt(self, data):
198
        return data
199
200
    def decrypt(self, data):
201
        return data
202
203
    def vsignature_size(self):
204
        return 0
205
206
    def verify(self, data, signature):
207
        """
208
        Verify signature and raise exception if signature is invalid
209
        """
210
        pass
211
212
    def remove_padding(self, data):
213
        return data
214
215
216
class SecurityPolicy:
217
    """
218
    Base class for security policy
219
    """
220
    URI = 'http://opcfoundation.org/UA/SecurityPolicy#None'
221
    AsymmetricSignatureURI = ''
222
    signature_key_size = 0
223
    symmetric_key_size = 0
224
225
    def __init__(self):
226
        self.asymmetric_cryptography = CryptographyNone()
227
        self.symmetric_cryptography = CryptographyNone()
228
        self.Mode = auto.MessageSecurityMode.None_
229
        self.server_certificate = None
230
        self.client_certificate = None
231
232
    def make_local_symmetric_key(self, secret, seed):
233
        pass
234
235
    def make_remote_symmetric_key(self, secret, seed):
236
        pass
237
238
239
class SecurityPolicyFactory:
240
    """
241
    Helper class for creating server-side SecurityPolicy.
242
    Server has one certificate and private key, but needs a separate
243
    SecurityPolicy for every client and client's certificate
244
    """
245
246
    def __init__(self, cls=SecurityPolicy, mode=auto.MessageSecurityMode.None_, certificate=None, private_key=None):
247
        self.cls = cls
248
        self.mode = mode
249
        self.certificate = certificate
250
        self.private_key = private_key
251
252
    def matches(self, uri, mode=None):
253
        return self.cls.URI == uri and (mode is None or self.mode == mode)
254
255
    def create(self, peer_certificate):
256
        if self.cls is SecurityPolicy:
257
            return self.cls()
258
        else:
259
            return self.cls(peer_certificate, self.certificate, self.private_key, self.mode)
260
261
262
class Message:
263
    def __init__(self, chunks):
264
        self._chunks = chunks
265
266
    def request_id(self):
267
        return self._chunks[0].SequenceHeader.RequestId
268
269
    def SequenceHeader(self):
270
        return self._chunks[0].SequenceHeader
271
272
    def SecurityHeader(self):
273
        return self._chunks[0].SecurityHeader
274
275
    def body(self):
276
        body = b"".join([c.Body for c in self._chunks])
277
        return utils.Buffer(body)
278
279
280
# FIXES for missing switchfield in NodeAttributes classes
281
ana = auto.NodeAttributesMask
282
283
284
class ObjectAttributes(auto.ObjectAttributes):
285
    def __init__(self):
286
        auto.ObjectAttributes.__init__(self)
287
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
288
289
290
class ObjectTypeAttributes(auto.ObjectTypeAttributes):
291
    def __init__(self):
292
        auto.ObjectTypeAttributes.__init__(self)
293
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
294
295
296
class VariableAttributes(auto.VariableAttributes):
297
    def __init__(self):
298
        auto.VariableAttributes.__init__(self)
299
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
300
        self.Historizing = False
301
        self.AccessLevel = AccessLevel.CurrentRead.mask
302
        self.UserAccessLevel = AccessLevel.CurrentRead.mask
303
304
305
class VariableTypeAttributes(auto.VariableTypeAttributes):
306
    def __init__(self):
307
        auto.VariableTypeAttributes.__init__(self)
308
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
309
310
311
class MethodAttributes(auto.MethodAttributes):
312
    def __init__(self):
313
        auto.MethodAttributes.__init__(self)
314
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
315
316
317
class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
318
    def __init__(self):
319
        auto.ReferenceTypeAttributes.__init__(self)
320
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
321
322
323
class DataTypeAttributes(auto.DataTypeAttributes):
324
    def __init__(self):
325
        auto.DataTypeAttributes.__init__(self)
326
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
327
328
329
class ViewAttributes(auto.ViewAttributes):
330
    def __init__(self):
331
        auto.ViewAttributes.__init__(self)
332
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
333
334
335
class Argument(auto.Argument):
336
    def __init__(self):
337
        auto.Argument.__init__(self)
338
        self.ValueRank = -2
339
340
341
class XmlElement(FrozenClass):
342
    '''
343
    An XML element encoded as a UTF-8 string.
344
    :ivar Value:
345
    :vartype Value: String
346
    '''
347
348
    ua_types = [
349
        ('Value', 'String'),
350
    ]
351
352
    def __init__(self, xml=""):
353
        self.Value = xml
354
        self._freeze = True
355
356
    def __str__(self):
357
        return f'XmlElement(Value:{self.Value})'
358
359
    __repr__ = __str__
360
361
    def __eq__(self, el):
362
        return isinstance(el, XmlElement) and self.Value == el.Value
363