Passed
Push — master ( 4a3463...98fd52 )
by Olivier
02:51 queued 10s
created

SecurityPolicyBasic256.make_local_symmetric_key()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 7
Ratio 100 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 3
dl 7
loc 7
rs 10
c 0
b 0
f 0
1
import logging
2
3
from abc import ABCMeta, abstractmethod
4
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError
5
try:
6
    from ..crypto import uacrypto
7
    CRYPTOGRAPHY_AVAILABLE = True
8
except ImportError:
9
    CRYPTOGRAPHY_AVAILABLE = False
10
11
12
POLICY_NONE_URI = 'http://opcfoundation.org/UA/SecurityPolicy#None'
13
14
15
def require_cryptography(obj):
16
    """
17
    Raise exception if cryptography module is not available.
18
    Call this function in constructors.
19
    """
20
    if not CRYPTOGRAPHY_AVAILABLE:
21
        raise UaError("Can't use {0}, cryptography module is not installed".format(obj.__class__.__name__))
22
23
24
class Signer(object):
25
    """
26
    Abstract base class for cryptographic signature algorithm
27
    """
28
29
    __metaclass__ = ABCMeta
30
31
    @abstractmethod
32
    def signature_size(self):
33
        pass
34
35
    @abstractmethod
36
    def signature(self, data):
37
        pass
38
39
40
class Verifier(object):
41
    """
42
    Abstract base class for cryptographic signature verification
43
    """
44
45
    __metaclass__ = ABCMeta
46
47
    @abstractmethod
48
    def signature_size(self):
49
        pass
50
51
    @abstractmethod
52
    def verify(self, data, signature):
53
        pass
54
55
56
class Encryptor(object):
57
    """
58
    Abstract base class for encryption algorithm
59
    """
60
61
    __metaclass__ = ABCMeta
62
63
    @abstractmethod
64
    def plain_block_size(self):
65
        pass
66
67
    @abstractmethod
68
    def encrypted_block_size(self):
69
        pass
70
71
    @abstractmethod
72
    def encrypt(self, data):
73
        pass
74
75
76
class Decryptor(object):
77
    """
78
    Abstract base class for decryption algorithm
79
    """
80
81
    __metaclass__ = ABCMeta
82
83
    @abstractmethod
84
    def plain_block_size(self):
85
        pass
86
87
    @abstractmethod
88
    def encrypted_block_size(self):
89
        pass
90
91
    @abstractmethod
92
    def decrypt(self, data):
93
        pass
94
95
96
class Cryptography(CryptographyNone):
97
    """
98
    Security policy: Sign or SignAndEncrypt
99
    """
100
101
    def __init__(self, mode=MessageSecurityMode.Sign):
102
        self.Signer = None
103
        self.Verifier = None
104
        self.Encryptor = None
105
        self.Decryptor = None
106
        if mode not in (MessageSecurityMode.Sign,
107
                        MessageSecurityMode.SignAndEncrypt):
108
            raise ValueError(f"unknown security mode {mode}")
109
        self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
110
111
    def plain_block_size(self):
112
        """
113
        Size of plain text block for block cipher.
114
        """
115
        if self.is_encrypted:
116
            return self.Encryptor.plain_block_size()
117
        return 1
118
119
    def encrypted_block_size(self):
120
        """
121
        Size of encrypted text block for block cipher.
122
        """
123
        if self.is_encrypted:
124
            return self.Encryptor.encrypted_block_size()
125
        return 1
126
127
    def padding(self, size):
128
        """
129
        Create padding for a block of given size.
130
        plain_size = size + len(padding) + signature_size()
131
        plain_size = N * plain_block_size()
132
        """
133
        if not self.is_encrypted:
134
            return b''
135
        block_size = self.Encryptor.plain_block_size()
136
        rem = (size + self.signature_size() + 1) % block_size
137
        if rem != 0:
138
            rem = block_size - rem
139
        return bytes(bytearray([rem])) * (rem + 1)
140
141
    def min_padding_size(self):
142
        if self.is_encrypted:
143
            return 1
144
        return 0
145
146
    def signature_size(self):
147
        return self.Signer.signature_size()
148
149
    def signature(self, data):
150
        return self.Signer.signature(data)
151
152
    def vsignature_size(self):
153
        return self.Verifier.signature_size()
154
155
    def verify(self, data, sig):
156
        self.Verifier.verify(data, sig)
157
158
    def encrypt(self, data):
159
        if self.is_encrypted:
160
            if not len(data) % self.Encryptor.plain_block_size() == 0:
161
                raise ValueError
162
            return self.Encryptor.encrypt(data)
163
        return data
164
165
    def decrypt(self, data):
166
        if self.is_encrypted:
167
            return self.Decryptor.decrypt(data)
168
        return data
169
170
    def remove_padding(self, data):
171
        if self.is_encrypted:
172
            pad_size = bytearray(data[-1:])[0] + 1
173
            return data[:-pad_size]
174
        return data
175
176
177
class SignerRsa(Signer):
178
179
    def __init__(self, client_pk):
180
        require_cryptography(self)
181
        self.client_pk = client_pk
182
        self.key_size = self.client_pk.key_size // 8
183
184
    def signature_size(self):
185
        return self.key_size
186
187
    def signature(self, data):
188
        return uacrypto.sign_sha1(self.client_pk, data)
189
190
191
class VerifierRsa(Verifier):
192
193
    def __init__(self, server_cert):
194
        require_cryptography(self)
195
        self.server_cert = server_cert
196
        self.key_size = self.server_cert.public_key().key_size // 8
197
198
    def signature_size(self):
199
        return self.key_size
200
201
    def verify(self, data, signature):
202
        uacrypto.verify_sha1(self.server_cert, data, signature)
203
204
205
class EncryptorRsa(Encryptor):
206
207
    def __init__(self, server_cert, enc_fn, padding_size):
208
        require_cryptography(self)
209
        self.server_cert = server_cert
210
        self.key_size = self.server_cert.public_key().key_size // 8
211
        self.encryptor = enc_fn
212
        self.padding_size = padding_size
213
214
    def plain_block_size(self):
215
        return self.key_size - self.padding_size
216
217
    def encrypted_block_size(self):
218
        return self.key_size
219
220
    def encrypt(self, data):
221
        encrypted = b''
222
        block_size = self.plain_block_size()
223
        for i in range(0, len(data), block_size):
224
            encrypted += self.encryptor(self.server_cert.public_key(),
225
                                        data[i: i + block_size])
226
        return encrypted
227
228
229
class DecryptorRsa(Decryptor):
230
231
    def __init__(self, client_pk, dec_fn, padding_size):
232
        require_cryptography(self)
233
        self.client_pk = client_pk
234
        self.key_size = self.client_pk.key_size // 8
235
        self.decryptor = dec_fn
236
        self.padding_size = padding_size
237
238
    def plain_block_size(self):
239
        return self.key_size - self.padding_size
240
241
    def encrypted_block_size(self):
242
        return self.key_size
243
244
    def decrypt(self, data):
245
        decrypted = b''
246
        block_size = self.encrypted_block_size()
247
        for i in range(0, len(data), block_size):
248
            decrypted += self.decryptor(self.client_pk,
249
                                        data[i: i + block_size])
250
        return decrypted
251
252
253
class SignerAesCbc(Signer):
254
255
    def __init__(self, key):
256
        require_cryptography(self)
257
        self.key = key
258
259
    def signature_size(self):
260
        return uacrypto.sha1_size()
261
262
    def signature(self, data):
263
        return uacrypto.hmac_sha1(self.key, data)
264
265
266
class VerifierAesCbc(Verifier):
267
268
    def __init__(self, key):
269
        require_cryptography(self)
270
        self.key = key
271
272
    def signature_size(self):
273
        return uacrypto.sha1_size()
274
275
    def verify(self, data, signature):
276
        expected = uacrypto.hmac_sha1(self.key, data)
277
        if signature != expected:
278
            raise uacrypto.InvalidSignature
279
280
281
class EncryptorAesCbc(Encryptor):
282
283
    def __init__(self, key, init_vec):
284
        require_cryptography(self)
285
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
286
287
    def plain_block_size(self):
288
        return self.cipher.algorithm.key_size // 8
289
290
    def encrypted_block_size(self):
291
        return self.cipher.algorithm.key_size // 8
292
293
    def encrypt(self, data):
294
        return uacrypto.cipher_encrypt(self.cipher, data)
295
296
297
class DecryptorAesCbc(Decryptor):
298
299
    def __init__(self, key, init_vec):
300
        require_cryptography(self)
301
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
302
303
    def plain_block_size(self):
304
        return self.cipher.algorithm.key_size // 8
305
306
    def encrypted_block_size(self):
307
        return self.cipher.algorithm.key_size // 8
308
309
    def decrypt(self, data):
310
        return uacrypto.cipher_decrypt(self.cipher, data)
311
312
class SignerSha256(Signer):
313
314
    def __init__(self, client_pk):
315
        require_cryptography(self)
316
        self.client_pk = client_pk
317
        self.key_size = self.client_pk.key_size // 8
318
319
    def signature_size(self):
320
        return self.key_size
321
322
    def signature(self, data):
323
        return uacrypto.sign_sha256(self.client_pk, data)
324
325
class VerifierSha256(Verifier):
326
327
    def __init__(self, server_cert):
328
        require_cryptography(self)
329
        self.server_cert = server_cert
330
        self.key_size = self.server_cert.public_key().key_size // 8
331
332
    def signature_size(self):
333
        return self.key_size
334
335
    def verify(self, data, signature):
336
        uacrypto.verify_sha256(self.server_cert, data, signature)
337
338
class SignerHMac256(Signer):
339
340
    def __init__(self, key):
341
        require_cryptography(self)
342
        self.key = key
343
344
    def signature_size(self):
345
        return uacrypto.sha256_size()
346
347
    def signature(self, data):
348
        return uacrypto.hmac_sha256(self.key, data)
349
350
351
class VerifierHMac256(Verifier):
352
353
    def __init__(self, key):
354
        require_cryptography(self)
355
        self.key = key
356
357
    def signature_size(self):
358
        return uacrypto.sha256_size()
359
360
    def verify(self, data, signature):
361
        expected = uacrypto.hmac_sha256(self.key, data)
362
        if signature != expected:
363
            raise uacrypto.InvalidSignature
364
365
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
366
    """
367
    DEPRECATED, do not use anymore!
368
369
    Security Basic 128Rsa15
370
    A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
371
    and 128-Bit (16 bytes) for encryption algorithms.
372
    - SymmetricSignatureAlgorithm - HmacSha1
373
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
374
    - SymmetricEncryptionAlgorithm - Aes128
375
      (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
376
    - AsymmetricSignatureAlgorithm - RsaSha1
377
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
378
    - AsymmetricKeyWrapAlgorithm - KwRsa15
379
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
380
    - AsymmetricEncryptionAlgorithm - Rsa15
381
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
382
    - KeyDerivationAlgorithm - PSha1
383
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
384
    - DerivedSignatureKeyLength - 128 (16 bytes)
385
    - MinAsymmetricKeyLength - 1024 (128 bytes)
386
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
387
    - CertificateSignatureAlgorithm - Sha1
388
389
    If a certificate or any certificate in the chain is not signed with
390
    a hash that is Sha1 or stronger then the certificate shall be rejected.
391
    """
392
393
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
394
    signature_key_size = 16
395
    symmetric_key_size = 16
396
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
397
    AsymmetricSignatureURI = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
398
399
    @staticmethod
400
    def encrypt_asymmetric(pubkey, data):
401
        return uacrypto.encrypt_rsa15(pubkey, data)
402
403
    def __init__(self, server_cert, client_cert, client_pk, mode):
404
        logger = logging.getLogger(__name__)
405
        logger.warning("DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!")
406
407
        require_cryptography(self)
408
        if isinstance(server_cert, bytes):
409
            server_cert = uacrypto.x509_from_der(server_cert)
410
        # even in Sign mode we need to asymmetrically encrypt secrets
411
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
412
        self.asymmetric_cryptography = Cryptography(
413
            MessageSecurityMode.SignAndEncrypt)
414
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
415
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
416
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
417
            server_cert, uacrypto.encrypt_rsa15, 11)
418
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
419
            client_pk, uacrypto.decrypt_rsa15, 11)
420
        self.symmetric_cryptography = Cryptography(mode)
421
        self.Mode = mode
422
        self.server_certificate = uacrypto.der_from_x509(server_cert)
423
        self.client_certificate = uacrypto.der_from_x509(client_cert)
424
425
    def make_local_symmetric_key(self, secret, seed):
426
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
427
428
        (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes)
429
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
430
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
431
432
    def make_remote_symmetric_key(self, secret, seed):
433
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
434
435
        (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes)
436
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
437
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
438
439
440 View Code Duplication
class SecurityPolicyBasic256(SecurityPolicy):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
    """
442
    DEPRECATED, do not use anymore!
443
444
    Security Basic 256
445
    A suite of algorithms that are for 256-Bit (32 bytes) encryption,
446
    algorithms include:
447
    - SymmetricSignatureAlgorithm - HmacSha1
448
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
449
    - SymmetricEncryptionAlgorithm - Aes256
450
      (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
451
    - AsymmetricSignatureAlgorithm - RsaSha1
452
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
453
    - AsymmetricKeyWrapAlgorithm - KwRsaOaep
454
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
455
    - AsymmetricEncryptionAlgorithm - RsaOaep
456
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
457
    - KeyDerivationAlgorithm - PSha1
458
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
459
    - DerivedSignatureKeyLength - 192 (24 bytes)
460
    - MinAsymmetricKeyLength - 1024 (128 bytes)
461
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
462
    - CertificateSignatureAlgorithm - Sha1
463
464
    If a certificate or any certificate in the chain is not signed with
465
    a hash that is Sha1 or stronger then the certificate shall be rejected.
466
    """
467
468
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
469
    signature_key_size = 24
470
    symmetric_key_size = 32
471
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
472
    AsymmetricSignatureURI = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
473
474
    @staticmethod
475
    def encrypt_asymmetric(pubkey, data):
476
        return uacrypto.encrypt_rsa_oaep(pubkey, data)
477
478
    def __init__(self, server_cert, client_cert, client_pk, mode):
479
        logger = logging.getLogger(__name__)
480
        logger.warning("DEPRECATED! Do not use SecurityPolicyBasic256 anymore!")
481
482
        require_cryptography(self)
483
        if isinstance(server_cert, bytes):
484
            server_cert = uacrypto.x509_from_der(server_cert)
485
        # even in Sign mode we need to asymmetrically encrypt secrets
486
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
487
        self.asymmetric_cryptography = Cryptography(
488
            MessageSecurityMode.SignAndEncrypt)
489
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
490
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
491
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
492
            server_cert, uacrypto.encrypt_rsa_oaep, 42)
493
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
494
            client_pk, uacrypto.decrypt_rsa_oaep, 42)
495
        self.symmetric_cryptography = Cryptography(mode)
496
        self.Mode = mode
497
        self.server_certificate = uacrypto.der_from_x509(server_cert)
498
        self.client_certificate = uacrypto.der_from_x509(client_cert)
499
500
    def make_local_symmetric_key(self, secret, seed):
501
        # specs part 6, 6.7.5
502
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
503
504
        (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes)
505
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
506
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
507
508
    def make_remote_symmetric_key(self, secret, seed):
509
510
        # specs part 6, 6.7.5
511
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
512
513
        (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes)
514
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
515
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
516
517 View Code Duplication
class SecurityPolicyBasic256Sha256(SecurityPolicy):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
518
    """
519
    Security Basic 256Sha256
520
    A suite of algorithms that uses Sha256 as Key-Wrap-algorithm
521
    and 256-Bit (32 bytes) for encryption algorithms.
522
523
    - SymmetricSignatureAlgorithm_HMAC-SHA2-256
524
      https://tools.ietf.org/html/rfc4634
525
    - SymmetricEncryptionAlgorithm_AES256-CBC
526
      http://www.w3.org/2001/04/xmlenc#aes256-cbc
527
    - AsymmetricSignatureAlgorithm_RSA-PKCS15-SHA2-256
528
      http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
529
    - AsymmetricEncryptionAlgorithm_RSA-OAEP-SHA1
530
      http://www.w3.org/2001/04/xmlenc#rsa-oaep
531
    - KeyDerivationAlgorithm_P-SHA2-256
532
      http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha256
533
    - CertificateSignatureAlgorithm_RSA-PKCS15-SHA2-256
534
      http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
535
    - Basic256Sha256_Limits
536
        -> DerivedSignatureKeyLength: 256 bits
537
        -> MinAsymmetricKeyLength: 2048 bits
538
        -> MaxAsymmetricKeyLength: 4096 bits
539
        -> SecureChannelNonceLength: 32 bytes
540
    """
541
542
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256"
543
    signature_key_size = 32
544
    symmetric_key_size = 32
545
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
546
    AsymmetricSignatureURI = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
547
548
    @staticmethod
549
    def encrypt_asymmetric(pubkey, data):
550
        return uacrypto.encrypt_rsa_oaep(pubkey, data)
551
552
    def __init__(self, server_cert, client_cert, client_pk, mode):
553
        require_cryptography(self)
554
        if isinstance(server_cert, bytes):
555
            server_cert = uacrypto.x509_from_der(server_cert)
556
        # even in Sign mode we need to asymmetrically encrypt secrets
557
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
558
        self.asymmetric_cryptography = Cryptography(
559
            MessageSecurityMode.SignAndEncrypt)
560
        self.asymmetric_cryptography.Signer = SignerSha256(client_pk)
561
        self.asymmetric_cryptography.Verifier = VerifierSha256(server_cert)
562
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
563
            server_cert, uacrypto.encrypt_rsa_oaep, 42)
564
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
565
            client_pk, uacrypto.decrypt_rsa_oaep, 42)
566
        self.symmetric_cryptography = Cryptography(mode)
567
        self.Mode = mode
568
        self.server_certificate = uacrypto.der_from_x509(server_cert)
569
        self.client_certificate = uacrypto.der_from_x509(client_cert)
570
571
    def make_local_symmetric_key(self, secret, seed):
572
        # specs part 6, 6.7.5
573
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
574
575
        (sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes)
576
        self.symmetric_cryptography.Signer = SignerHMac256(sigkey)
577
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
578
579
    def make_remote_symmetric_key(self, secret, seed):
580
581
        # specs part 6, 6.7.5
582
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
583
584
        (sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes)
585
        self.symmetric_cryptography.Verifier = VerifierHMac256(sigkey)
586
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
587
588
def encrypt_asymmetric(pubkey, data, policy_uri):
589
    """
590
    Encrypt data with pubkey using an asymmetric algorithm.
591
    The algorithm is selected by policy_uri.
592
    Returns a tuple (encrypted_data, algorithm_uri)
593
    """
594
    for cls in [SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
595
        if policy_uri == cls.URI:
596
            return (cls.encrypt_asymmetric(pubkey, data),
597
                    cls.AsymmetricEncryptionURI)
598
    if not policy_uri or policy_uri == POLICY_NONE_URI:
599
        return data, ''
600
    raise UaError("Unsupported security policy `{0}`".format(policy_uri))
601