Completed
Push — master ( 1d0007...a154b4 )
by Olivier
04:03
created

opcua.SecurityPolicyBasic256   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 57
Duplicated Lines 0 %
Metric Value
wmc 2
dl 0
loc 57
rs 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
A make_symmetric_key() 0 10 1
A __init__() 0 16 1
1
from opcua.uaprotocol import CryptographyNone, SecurityPolicy
2
from opcua.uaprotocol import MessageSecurityMode
3
from abc import ABCMeta, abstractmethod
4
try:
5
    from cryptography import x509, exceptions
6
    from cryptography.hazmat.backends import default_backend
7
    from cryptography.hazmat.primitives.asymmetric import padding
8
    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
9
    from cryptography.hazmat.primitives import hashes, serialization, hmac
10
    CRYPTOGRAPHY_AVAILABLE = True
11
except ImportError:
12
    CRYPTOGRAPHY_AVAILABLE = False
13
14
15
def require_cryptography(obj):
16
    """
17
    Raise exception if cryptography module is not available.
18
    Call this function in constructors.
19
    """
20
    if not CRYPTOGRAPHY_AVAILABLE:
21
        raise Exception("Can't use {}, cryptography module is not installed"
22
                        .format(obj.__class__.__name__))
23
24
25
class Signer(object):
26
    """
27
    Abstract base class for cryptographic signature algorithm
28
    """
29
30
    __metaclass__ = ABCMeta
31
32
    @abstractmethod
33
    def signature_size(self):
34
        pass
35
36
    @abstractmethod
37
    def signature(self, data):
38
        pass
39
40
41
class Verifier(object):
42
    """
43
    Abstract base class for cryptographic signature verification
44
    """
45
46
    __metaclass__ = ABCMeta
47
48
    @abstractmethod
49
    def signature_size(self):
50
        pass
51
52
    @abstractmethod
53
    def verify(self, data, signature):
54
        pass
55
56
57
class Encryptor(object):
58
    """
59
    Abstract base class for encryption algorithm
60
    """
61
62
    __metaclass__ = ABCMeta
63
64
    @abstractmethod
65
    def plain_block_size(self):
66
        pass
67
68
    @abstractmethod
69
    def encrypted_block_size(self):
70
        pass
71
72
    @abstractmethod
73
    def encrypt(self, data):
74
        pass
75
76
77
class Decryptor(object):
78
    """
79
    Abstract base class for decryption algorithm
80
    """
81
82
    __metaclass__ = ABCMeta
83
84
    @abstractmethod
85
    def plain_block_size(self):
86
        pass
87
88
    @abstractmethod
89
    def encrypted_block_size(self):
90
        pass
91
92
    @abstractmethod
93
    def decrypt(self, data):
94
        pass
95
96
97
class Cryptography(CryptographyNone):
98
    """
99
    Security policy: Sign or SignAndEncrypt
100
    """
101
    def __init__(self, mode=MessageSecurityMode.Sign):
102
        self.Signer = None
103
        self.Verifier = None
104
        self.Encryptor = None
105
        self.Decryptor = None
106
        assert mode in (MessageSecurityMode.Sign,
107
                        MessageSecurityMode.SignAndEncrypt)
108
        self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
109
110
    def plain_block_size(self):
111
        """
112
        Size of plain text block for block cipher.
113
        """
114
        if self.is_encrypted:
115
            return self.Encryptor.plain_block_size()
116
        return 1
117
118
    def encrypted_block_size(self):
119
        """
120
        Size of encrypted text block for block cipher.
121
        """
122
        if self.is_encrypted:
123
            return self.Encryptor.encrypted_block_size()
124
        return 1
125
126
    def padding(self, size):
127
        """
128
        Create padding for a block of given size.
129
        plain_size = size + len(padding) + signature_size()
130
        plain_size = N * plain_block_size()
131
        """
132
        if not self.is_encrypted:
133
            return b''
134
        block_size = self.Encryptor.plain_block_size()
135
        rem = (size + self.signature_size() + 1) % block_size
136
        if rem != 0:
137
            rem = block_size - rem
138
        return bytes(bytearray([rem])) * (rem + 1)
139
140
    def min_padding_size(self):
141
        if self.is_encrypted:
142
            return 1
143
        return 0
144
145
    def signature_size(self):
146
        return self.Signer.signature_size()
147
148
    def signature(self, data):
149
        return self.Signer.signature(data)
150
151
    def vsignature_size(self):
152
        return self.Verifier.signature_size()
153
154
    def verify(self, data, sig):
155
        self.Verifier.verify(data, sig)
156
157
    def encrypt(self, data):
158
        if self.is_encrypted:
159
            assert len(data) % self.Encryptor.plain_block_size() == 0
160
            return self.Encryptor.encrypt(data)
161
        return data
162
163
    def decrypt(self, data):
164
        if self.is_encrypted:
165
            return self.Decryptor.decrypt(data)
166
        return data
167
168
    def remove_padding(self, data):
169
        if self.is_encrypted:
170
            pad_size = bytearray(data[-1:])[0] + 1
171
            return data[:-pad_size]
172
        return data
173
174
175
class SignerRsa(Signer):
176
    def __init__(self, client_pk):
177
        require_cryptography(self)
178
        self.client_pk = serialization.load_pem_private_key(
179
                client_pk, None, default_backend())
180
        self.key_size = self.client_pk.key_size // 8
181
182
    def signature_size(self):
183
        return self.key_size
184
185
    def signature(self, data):
186
        signer = self.client_pk.signer(padding.PKCS1v15(), hashes.SHA1())
187
        signer.update(data)
188
        return signer.finalize()
189
190
191
class VerifierRsa(Verifier):
192
    def __init__(self, server_cert):
193
        require_cryptography(self)
194
        self.server_cert = x509.load_der_x509_certificate(
195
                server_cert, default_backend())
196
        self.key_size = self.server_cert.public_key().key_size // 8
197
198
    def signature_size(self):
199
        return self.key_size
200
201
    def verify(self, data, signature):
202
        verifier = self.server_cert.public_key().verifier(
203
                signature, padding.PKCS1v15(), hashes.SHA1())
204
        verifier.update(data)
205
        verifier.verify()
206
207
208
class EncryptorRsa(Encryptor):
209
    def __init__(self, server_cert, padding_algorithm, padding_size):
210
        require_cryptography(self)
211
        self.server_cert = x509.load_der_x509_certificate(
212
                server_cert, default_backend())
213
        self.key_size = self.server_cert.public_key().key_size // 8
214
        self.padding = padding_algorithm
215
        self.padding_size = padding_size
216
217
    def plain_block_size(self):
218
        return self.key_size - self.padding_size
219
220
    def encrypted_block_size(self):
221
        return self.key_size
222
223
    def encrypt(self, data):
224
        encrypted = b''
225
        block_size = self.plain_block_size()
226
        for i in range(0, len(data), block_size):
227
            encrypted += self.server_cert.public_key().encrypt(
228
                    data[i : i+block_size], self.padding)
229
        return encrypted
230
231
232
class DecryptorRsa(Decryptor):
233
    def __init__(self, client_pk, padding_algorithm, padding_size):
234
        require_cryptography(self)
235
        self.client_pk = serialization.load_pem_private_key(
236
                client_pk, None, default_backend())
237
        self.key_size = self.client_pk.key_size // 8
238
        self.padding = padding_algorithm
239
        self.padding_size = padding_size
240
241
    def plain_block_size(self):
242
        return self.key_size - self.padding_size
243
244
    def encrypted_block_size(self):
245
        return self.key_size
246
247
    def decrypt(self, data):
248
        decrypted = b''
249
        block_size = self.encrypted_block_size()
250
        for i in range(0, len(data), block_size):
251
            decrypted += self.client_pk.decrypt(
252
                    data[i : i+block_size], self.padding)
253
        return decrypted
254
255
256
class SignerAesCbc(Signer):
257
    def __init__(self, key):
258
        require_cryptography(self)
259
        self.key = key
260
261
    def signature_size(self):
262
        return hashes.SHA1.digest_size
263
264
    def signature(self, data):
265
        hasher = hmac.HMAC(self.key, hashes.SHA1(), backend=default_backend())
266
        hasher.update(data)
267
        return hasher.finalize()
268
269
270
class VerifierAesCbc(Verifier):
271
    def __init__(self, key):
272
        require_cryptography(self)
273
        self.key = key
274
275
    def signature_size(self):
276
        return hashes.SHA1.digest_size
277
278
    def verify(self, data, signature):
279
        hasher = hmac.HMAC(self.key, hashes.SHA1(), backend=default_backend())
280
        hasher.update(data)
281
        expected = hasher.finalize()
282
        if signature != expected:
283
            raise exceptions.InvalidSignature
284
285
286
class EncryptorAesCbc(Encryptor):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
287
    def __init__(self, key, init_vec):
288
        require_cryptography(self)
289
        self.cipher = Cipher(algorithms.AES(key), modes.CBC(init_vec),
290
                             backend=default_backend())
291
292
    def plain_block_size(self):
293
        return self.cipher.algorithm.key_size // 8
294
295
    def encrypted_block_size(self):
296
        return self.cipher.algorithm.key_size // 8
297
298
    def encrypt(self, data):
299
        encryptor = self.cipher.encryptor()
300
        return encryptor.update(data) + encryptor.finalize()
301
302
303
class DecryptorAesCbc(Decryptor):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
304
    def __init__(self, key, init_vec):
305
        require_cryptography(self)
306
        self.cipher = Cipher(algorithms.AES(key), modes.CBC(init_vec),
307
                             backend=default_backend())
308
309
    def plain_block_size(self):
310
        return self.cipher.algorithm.key_size // 8
311
312
    def encrypted_block_size(self):
313
        return self.cipher.algorithm.key_size // 8
314
315
    def decrypt(self, data):
316
        decryptor = self.cipher.decryptor()
317
        return decryptor.update(data) + decryptor.finalize()
318
319
320
def hash_hmac(key, message):
321
    hasher = hmac.HMAC(key, hashes.SHA1(), backend=default_backend())
322
    hasher.update(message)
323
    return hasher.finalize()
324
325
326
def p_sha1(key, body, sizes=()):
327
    """
328
    Derive one or more keys from key and body.
329
    Lengths of keys will match sizes argument
330
    """
331
    full_size = 0
332
    for size in sizes:
333
        full_size += size
334
335
    result = b''
336
    accum = body
337
    while len(result) < full_size:
338
        accum = hash_hmac(key, accum)
339
        result += hash_hmac(key, accum + body)
340
341
    parts = []
342
    for size in sizes:
343
        parts.append(result[:size])
344
        result = result[size:]
345
    return tuple(parts)
346
347
348
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
349
    """
350
    Security Basic 128Rsa15
351
    A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
352
    and 128-Bit (16 bytes) for encryption algorithms.
353
    - SymmetricSignatureAlgorithm - HmacSha1
354
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
355
    - SymmetricEncryptionAlgorithm - Aes128
356
      (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
357
    - AsymmetricSignatureAlgorithm - RsaSha1
358
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
359
    - AsymmetricKeyWrapAlgorithm - KwRsa15
360
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
361
    - AsymmetricEncryptionAlgorithm - Rsa15
362
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
363
    - KeyDerivationAlgorithm - PSha1
364
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
365
    - DerivedSignatureKeyLength - 128 (16 bytes)
366
    - MinAsymmetricKeyLength - 1024 (128 bytes)
367
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
368
    - CertificateSignatureAlgorithm - Sha1
369
370
    If a certificate or any certificate in the chain is not signed with
371
    a hash that is Sha1 or stronger then the certificate shall be rejected.
372
    """
373
374
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
375
    signature_key_size = 16
376
    symmetric_key_size = 16
377
378
    def __init__(self, server_cert, client_cert, client_pk, mode):
379
        require_cryptography(self)
380
        # even in Sign mode we need to asymmetrically encrypt secrets
381
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
382
        self.asymmetric_cryptography = Cryptography(
383
                MessageSecurityMode.SignAndEncrypt)
384
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
385
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
386
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
387
                server_cert, padding.PKCS1v15(), 11)
388
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
389
                client_pk, padding.PKCS1v15(), 11)
390
        self.symmetric_cryptography = Cryptography(mode)
391
        self.Mode = mode
392
        self.server_certificate = server_cert
393
        self.client_certificate = client_cert
394
395
    def make_symmetric_key(self, nonce1, nonce2):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
396
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
397
398
        (sigkey, key, init_vec) = p_sha1(nonce2, nonce1, key_sizes)
399
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
400
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
401
402
        (sigkey, key, init_vec) = p_sha1(nonce1, nonce2, key_sizes)
403
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
404
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
405
406
407
def oaep():
408
    return padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None)
409
410
411
class SecurityPolicyBasic256(SecurityPolicy):
412
    """
413
    Security Basic 256
414
    A suite of algorithms that are for 256-Bit (32 bytes) encryption,
415
    algorithms include:
416
    - SymmetricSignatureAlgorithm - HmacSha1
417
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
418
    - SymmetricEncryptionAlgorithm - Aes256
419
      (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
420
    - AsymmetricSignatureAlgorithm - RsaSha1
421
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
422
    - AsymmetricKeyWrapAlgorithm - KwRsaOaep
423
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
424
    - AsymmetricEncryptionAlgorithm - RsaOaep
425
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
426
    - KeyDerivationAlgorithm - PSha1
427
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
428
    - DerivedSignatureKeyLength - 192 (24 bytes)
429
    - MinAsymmetricKeyLength - 1024 (128 bytes)
430
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
431
    - CertificateSignatureAlgorithm - Sha1
432
433
    If a certificate or any certificate in the chain is not signed with
434
    a hash that is Sha1 or stronger then the certificate shall be rejected.
435
    """
436
437
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
438
    signature_key_size = 24
439
    symmetric_key_size = 32
440
441
    def __init__(self, server_cert, client_cert, client_pk, mode):
442
        require_cryptography(self)
443
        # even in Sign mode we need to asymmetrically encrypt secrets
444
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
445
        self.asymmetric_cryptography = Cryptography(
446
                MessageSecurityMode.SignAndEncrypt)
447
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
448
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
449
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
450
                server_cert, oaep(), 42)
451
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
452
                client_pk, oaep(), 42)
453
        self.symmetric_cryptography = Cryptography(mode)
454
        self.Mode = mode
455
        self.server_certificate = server_cert
456
        self.client_certificate = client_cert
457
458
    def make_symmetric_key(self, nonce1, nonce2):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
459
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
460
461
        (sigkey, key, init_vec) = p_sha1(nonce2, nonce1, key_sizes)
462
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
463
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
464
465
        (sigkey, key, init_vec) = p_sha1(nonce1, nonce2, key_sizes)
466
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
467
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
468