Completed
Pull Request — master (#128)
by Alexander
02:25
created

encrypt_asymmetric()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 3
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
1 1
from abc import ABCMeta, abstractmethod
2 1
from opcua.ua import CryptographyNone, SecurityPolicy
3 1
from opcua.ua import MessageSecurityMode
4 1
from opcua.ua import UaError
5 1
try:
6 1
    from opcua.crypto import uacrypto
7
    CRYPTOGRAPHY_AVAILABLE = True
8 1
except ImportError:
9 1
    CRYPTOGRAPHY_AVAILABLE = False
10
11
12 1
def require_cryptography(obj):
13
    """
14
    Raise exception if cryptography module is not available.
15
    Call this function in constructors.
16
    """
17
    if not CRYPTOGRAPHY_AVAILABLE:
18
        raise UaError("Can't use {}, cryptography module is not installed".format(obj.__class__.__name__))
19
20
21 1
class Signer(object):
22
    """
23
    Abstract base class for cryptographic signature algorithm
24
    """
25
26 1
    __metaclass__ = ABCMeta
27
28 1
    @abstractmethod
29
    def signature_size(self):
30
        pass
31
32 1
    @abstractmethod
33
    def signature(self, data):
34
        pass
35
36
37 1
class Verifier(object):
38
    """
39
    Abstract base class for cryptographic signature verification
40
    """
41
42 1
    __metaclass__ = ABCMeta
43
44 1
    @abstractmethod
45
    def signature_size(self):
46
        pass
47
48 1
    @abstractmethod
49
    def verify(self, data, signature):
50
        pass
51
52
53 1
class Encryptor(object):
54
    """
55
    Abstract base class for encryption algorithm
56
    """
57
58 1
    __metaclass__ = ABCMeta
59
60 1
    @abstractmethod
61
    def plain_block_size(self):
62
        pass
63
64 1
    @abstractmethod
65
    def encrypted_block_size(self):
66
        pass
67
68 1
    @abstractmethod
69
    def encrypt(self, data):
70
        pass
71
72
73 1
class Decryptor(object):
74
    """
75
    Abstract base class for decryption algorithm
76
    """
77
78 1
    __metaclass__ = ABCMeta
79
80 1
    @abstractmethod
81
    def plain_block_size(self):
82
        pass
83
84 1
    @abstractmethod
85
    def encrypted_block_size(self):
86
        pass
87
88 1
    @abstractmethod
89
    def decrypt(self, data):
90
        pass
91
92
93 1
class Cryptography(CryptographyNone):
94
    """
95
    Security policy: Sign or SignAndEncrypt
96
    """
97
98 1
    def __init__(self, mode=MessageSecurityMode.Sign):
99
        self.Signer = None
100
        self.Verifier = None
101
        self.Encryptor = None
102
        self.Decryptor = None
103
        assert mode in (MessageSecurityMode.Sign,
104
                        MessageSecurityMode.SignAndEncrypt)
105
        self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
106
107 1
    def plain_block_size(self):
108
        """
109
        Size of plain text block for block cipher.
110
        """
111
        if self.is_encrypted:
112
            return self.Encryptor.plain_block_size()
113
        return 1
114
115 1
    def encrypted_block_size(self):
116
        """
117
        Size of encrypted text block for block cipher.
118
        """
119
        if self.is_encrypted:
120
            return self.Encryptor.encrypted_block_size()
121
        return 1
122
123 1
    def padding(self, size):
124
        """
125
        Create padding for a block of given size.
126
        plain_size = size + len(padding) + signature_size()
127
        plain_size = N * plain_block_size()
128
        """
129
        if not self.is_encrypted:
130
            return b''
131
        block_size = self.Encryptor.plain_block_size()
132
        rem = (size + self.signature_size() + 1) % block_size
133
        if rem != 0:
134
            rem = block_size - rem
135
        return bytes(bytearray([rem])) * (rem + 1)
136
137 1
    def min_padding_size(self):
138
        if self.is_encrypted:
139
            return 1
140
        return 0
141
142 1
    def signature_size(self):
143
        return self.Signer.signature_size()
144
145 1
    def signature(self, data):
146
        return self.Signer.signature(data)
147
148 1
    def vsignature_size(self):
149
        return self.Verifier.signature_size()
150
151 1
    def verify(self, data, sig):
152
        self.Verifier.verify(data, sig)
153
154 1
    def encrypt(self, data):
155
        if self.is_encrypted:
156
            assert len(data) % self.Encryptor.plain_block_size() == 0
157
            return self.Encryptor.encrypt(data)
158
        return data
159
160 1
    def decrypt(self, data):
161
        if self.is_encrypted:
162
            return self.Decryptor.decrypt(data)
163
        return data
164
165 1
    def remove_padding(self, data):
166
        if self.is_encrypted:
167
            pad_size = bytearray(data[-1:])[0] + 1
168
            return data[:-pad_size]
169
        return data
170
171
172 1
class SignerRsa(Signer):
173
174 1
    def __init__(self, client_pk):
175
        require_cryptography(self)
176
        self.client_pk = client_pk
177
        self.key_size = self.client_pk.key_size // 8
178
179 1
    def signature_size(self):
180
        return self.key_size
181
182 1
    def signature(self, data):
183
        return uacrypto.sign_sha1(self.client_pk, data)
184
185
186 1
class VerifierRsa(Verifier):
187
188 1
    def __init__(self, server_cert):
189
        require_cryptography(self)
190
        self.server_cert = server_cert
191
        self.key_size = self.server_cert.public_key().key_size // 8
192
193 1
    def signature_size(self):
194
        return self.key_size
195
196 1
    def verify(self, data, signature):
197
        uacrypto.verify_sha1(self.server_cert, data, signature)
198
199
200 1
class EncryptorRsa(Encryptor):
201
202 1
    def __init__(self, server_cert, enc_fn, padding_size):
203
        require_cryptography(self)
204
        self.server_cert = server_cert
205
        self.key_size = self.server_cert.public_key().key_size // 8
206
        self.encryptor = enc_fn
207
        self.padding_size = padding_size
208
209 1
    def plain_block_size(self):
210
        return self.key_size - self.padding_size
211
212 1
    def encrypted_block_size(self):
213
        return self.key_size
214
215 1
    def encrypt(self, data):
216
        encrypted = b''
217
        block_size = self.plain_block_size()
218
        for i in range(0, len(data), block_size):
219
            encrypted += self.encryptor(self.server_cert.public_key(),
220
                                        data[i: i + block_size])
221
        return encrypted
222
223
224 1
class DecryptorRsa(Decryptor):
225
226 1
    def __init__(self, client_pk, dec_fn, padding_size):
227
        require_cryptography(self)
228
        self.client_pk = client_pk
229
        self.key_size = self.client_pk.key_size // 8
230
        self.decryptor = dec_fn
231
        self.padding_size = padding_size
232
233 1
    def plain_block_size(self):
234
        return self.key_size - self.padding_size
235
236 1
    def encrypted_block_size(self):
237
        return self.key_size
238
239 1
    def decrypt(self, data):
240
        decrypted = b''
241
        block_size = self.encrypted_block_size()
242
        for i in range(0, len(data), block_size):
243
            decrypted += self.decryptor(self.client_pk,
244
                                        data[i: i + block_size])
245
        return decrypted
246
247
248 1
class SignerAesCbc(Signer):
249
250 1
    def __init__(self, key):
251
        require_cryptography(self)
252
        self.key = key
253
254 1
    def signature_size(self):
255
        return uacrypto.sha1_size()
256
257 1
    def signature(self, data):
258
        return uacrypto.hmac_sha1(self.key, data)
259
260
261 1
class VerifierAesCbc(Verifier):
262
263 1
    def __init__(self, key):
264
        require_cryptography(self)
265
        self.key = key
266
267 1
    def signature_size(self):
268
        return uacrypto.sha1_size()
269
270 1
    def verify(self, data, signature):
271
        expected = uacrypto.hmac_sha1(self.key, data)
272
        if signature != expected:
273
            raise uacrypto.InvalidSignature
274
275
276 1
class EncryptorAesCbc(Encryptor):
277
278 1
    def __init__(self, key, init_vec):
279
        require_cryptography(self)
280
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
281
282 1
    def plain_block_size(self):
283
        return self.cipher.algorithm.key_size // 8
284
285 1
    def encrypted_block_size(self):
286
        return self.cipher.algorithm.key_size // 8
287
288 1
    def encrypt(self, data):
289
        return uacrypto.cipher_encrypt(self.cipher, data)
290
291
292 1
class DecryptorAesCbc(Decryptor):
293
294 1
    def __init__(self, key, init_vec):
295
        require_cryptography(self)
296
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
297
298 1
    def plain_block_size(self):
299
        return self.cipher.algorithm.key_size // 8
300
301 1
    def encrypted_block_size(self):
302
        return self.cipher.algorithm.key_size // 8
303
304 1
    def decrypt(self, data):
305
        return uacrypto.cipher_decrypt(self.cipher, data)
306
307
308 1
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
309
    """
310
    Security Basic 128Rsa15
311
    A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
312
    and 128-Bit (16 bytes) for encryption algorithms.
313
    - SymmetricSignatureAlgorithm - HmacSha1
314
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
315
    - SymmetricEncryptionAlgorithm - Aes128
316
      (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
317
    - AsymmetricSignatureAlgorithm - RsaSha1
318
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
319
    - AsymmetricKeyWrapAlgorithm - KwRsa15
320
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
321
    - AsymmetricEncryptionAlgorithm - Rsa15
322
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
323
    - KeyDerivationAlgorithm - PSha1
324
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
325
    - DerivedSignatureKeyLength - 128 (16 bytes)
326
    - MinAsymmetricKeyLength - 1024 (128 bytes)
327
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
328
    - CertificateSignatureAlgorithm - Sha1
329
330
    If a certificate or any certificate in the chain is not signed with
331
    a hash that is Sha1 or stronger then the certificate shall be rejected.
332
    """
333
334 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
335 1
    signature_key_size = 16
336 1
    symmetric_key_size = 16
337 1
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
338
339 1
    @staticmethod
340
    def encrypt_asymmetric(pubkey, data):
341
        return uacrypto.encrypt_rsa15(pubkey, data)
342
343 1
    def __init__(self, server_cert, client_cert, client_pk, mode):
344
        require_cryptography(self)
345
        if isinstance(server_cert, bytes):
346
            server_cert = uacrypto.x509_from_der(server_cert)
347
        # even in Sign mode we need to asymmetrically encrypt secrets
348
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
349
        self.asymmetric_cryptography = Cryptography(
350
            MessageSecurityMode.SignAndEncrypt)
351
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
352
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
353
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
354
            server_cert, uacrypto.encrypt_rsa15, 11)
355
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
356
            client_pk, uacrypto.decrypt_rsa15, 11)
357
        self.symmetric_cryptography = Cryptography(mode)
358
        self.Mode = mode
359
        self.server_certificate = uacrypto.der_from_x509(server_cert)
360
        self.client_certificate = uacrypto.der_from_x509(client_cert)
361
362 1
    def make_symmetric_key(self, nonce1, nonce2):
363
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
364
365
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
366
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
367
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
368
369
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
370
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
371
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
372
373
374 1
class SecurityPolicyBasic256(SecurityPolicy):
375
    """
376
    Security Basic 256
377
    A suite of algorithms that are for 256-Bit (32 bytes) encryption,
378
    algorithms include:
379
    - SymmetricSignatureAlgorithm - HmacSha1
380
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
381
    - SymmetricEncryptionAlgorithm - Aes256
382
      (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
383
    - AsymmetricSignatureAlgorithm - RsaSha1
384
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
385
    - AsymmetricKeyWrapAlgorithm - KwRsaOaep
386
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
387
    - AsymmetricEncryptionAlgorithm - RsaOaep
388
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
389
    - KeyDerivationAlgorithm - PSha1
390
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
391
    - DerivedSignatureKeyLength - 192 (24 bytes)
392
    - MinAsymmetricKeyLength - 1024 (128 bytes)
393
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
394
    - CertificateSignatureAlgorithm - Sha1
395
396
    If a certificate or any certificate in the chain is not signed with
397
    a hash that is Sha1 or stronger then the certificate shall be rejected.
398
    """
399
400 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
401 1
    signature_key_size = 24
402 1
    symmetric_key_size = 32
403 1
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
404
405 1
    @staticmethod
406
    def encrypt_asymmetric(pubkey, data):
407
        return uacrypto.encrypt_rsa_oaep(pubkey, data)
408
409 1
    def __init__(self, server_cert, client_cert, client_pk, mode):
410
        require_cryptography(self)
411
        if isinstance(server_cert, bytes):
412
            server_cert = uacrypto.x509_from_der(server_cert)
413
        # even in Sign mode we need to asymmetrically encrypt secrets
414
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
415
        self.asymmetric_cryptography = Cryptography(
416
            MessageSecurityMode.SignAndEncrypt)
417
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
418
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
419
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
420
            server_cert, uacrypto.encrypt_rsa_oaep, 42)
421
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
422
            client_pk, uacrypto.decrypt_rsa_oaep, 42)
423
        self.symmetric_cryptography = Cryptography(mode)
424
        self.Mode = mode
425
        self.server_certificate = uacrypto.der_from_x509(server_cert)
426
        self.client_certificate = uacrypto.der_from_x509(client_cert)
427
428 1
    def make_symmetric_key(self, nonce1, nonce2):
429
        # specs part 6, 6.7.5
430
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
431
432
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
433
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
434
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
435
436
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
437
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
438
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
439
440
441 1
def encrypt_asymmetric(pubkey, data, policy_uri):
442
    """
443
    Encrypt data with pubkey using an asymmetric algorithm.
444
    The algorithm is selected by policy_uri.
445
    Returns a tuple (encrypted_data, algorithm_uri)
446
    """
447
    for cls in [SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
448
        if policy_uri == cls.URI:
449
            return (cls.encrypt_asymmetric(pubkey, data),
450
                    cls.AsymmetricEncryptionURI)
451
    raise UaError("Unsupported security policy `{}`".format(uri))
452