Completed
Pull Request — master (#138)
by Alexander
02:04
created

opcua.crypto.encrypt_asymmetric()   B

Complexity

Conditions 5

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30
Metric Value
cc 5
dl 0
loc 13
ccs 0
cts 4
cp 0
crap 30
rs 8.5454
1 1
from abc import ABCMeta, abstractmethod
2 1
from opcua.ua import CryptographyNone, SecurityPolicy
3 1
from opcua.ua import MessageSecurityMode
4 1
from opcua.ua import UaError
5 1
try:
6 1
    from opcua.crypto import uacrypto
7
    CRYPTOGRAPHY_AVAILABLE = True
8 1
except ImportError:
9 1
    CRYPTOGRAPHY_AVAILABLE = False
10
11
12 1
POLICY_NONE_URI = 'http://opcfoundation.org/UA/SecurityPolicy#None'
13
14
15
def require_cryptography(obj):
16
    """
17
    Raise exception if cryptography module is not available.
18
    Call this function in constructors.
19
    """
20
    if not CRYPTOGRAPHY_AVAILABLE:
21 1
        raise UaError("Can't use {}, cryptography module is not installed".format(obj.__class__.__name__))
22
23
24
class Signer(object):
25
    """
26 1
    Abstract base class for cryptographic signature algorithm
27
    """
28 1
29
    __metaclass__ = ABCMeta
30
31
    @abstractmethod
32 1
    def signature_size(self):
33
        pass
34
35
    @abstractmethod
36
    def signature(self, data):
37 1
        pass
38
39
40
class Verifier(object):
41
    """
42 1
    Abstract base class for cryptographic signature verification
43
    """
44 1
45
    __metaclass__ = ABCMeta
46
47
    @abstractmethod
48 1
    def signature_size(self):
49
        pass
50
51
    @abstractmethod
52
    def verify(self, data, signature):
53 1
        pass
54
55
56
class Encryptor(object):
57
    """
58 1
    Abstract base class for encryption algorithm
59
    """
60 1
61
    __metaclass__ = ABCMeta
62
63
    @abstractmethod
64 1
    def plain_block_size(self):
65
        pass
66
67
    @abstractmethod
68 1
    def encrypted_block_size(self):
69
        pass
70
71
    @abstractmethod
72
    def encrypt(self, data):
73 1
        pass
74
75
76
class Decryptor(object):
77
    """
78 1
    Abstract base class for decryption algorithm
79
    """
80 1
81
    __metaclass__ = ABCMeta
82
83
    @abstractmethod
84 1
    def plain_block_size(self):
85
        pass
86
87
    @abstractmethod
88 1
    def encrypted_block_size(self):
89
        pass
90
91
    @abstractmethod
92
    def decrypt(self, data):
93 1
        pass
94
95
96
class Cryptography(CryptographyNone):
97
    """
98 1
    Security policy: Sign or SignAndEncrypt
99
    """
100
101
    def __init__(self, mode=MessageSecurityMode.Sign):
102
        self.Signer = None
103
        self.Verifier = None
104
        self.Encryptor = None
105
        self.Decryptor = None
106
        assert mode in (MessageSecurityMode.Sign,
107 1
                        MessageSecurityMode.SignAndEncrypt)
108
        self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
109
110
    def plain_block_size(self):
111
        """
112
        Size of plain text block for block cipher.
113
        """
114
        if self.is_encrypted:
115 1
            return self.Encryptor.plain_block_size()
116
        return 1
117
118
    def encrypted_block_size(self):
119
        """
120
        Size of encrypted text block for block cipher.
121
        """
122
        if self.is_encrypted:
123 1
            return self.Encryptor.encrypted_block_size()
124
        return 1
125
126
    def padding(self, size):
127
        """
128
        Create padding for a block of given size.
129
        plain_size = size + len(padding) + signature_size()
130
        plain_size = N * plain_block_size()
131
        """
132
        if not self.is_encrypted:
133
            return b''
134
        block_size = self.Encryptor.plain_block_size()
135
        rem = (size + self.signature_size() + 1) % block_size
136
        if rem != 0:
137 1
            rem = block_size - rem
138
        return bytes(bytearray([rem])) * (rem + 1)
139
140
    def min_padding_size(self):
141
        if self.is_encrypted:
142 1
            return 1
143
        return 0
144
145 1
    def signature_size(self):
146
        return self.Signer.signature_size()
147
148 1
    def signature(self, data):
149
        return self.Signer.signature(data)
150
151 1
    def vsignature_size(self):
152
        return self.Verifier.signature_size()
153
154 1
    def verify(self, data, sig):
155
        self.Verifier.verify(data, sig)
156
157
    def encrypt(self, data):
158
        if self.is_encrypted:
159
            assert len(data) % self.Encryptor.plain_block_size() == 0
160 1
            return self.Encryptor.encrypt(data)
161
        return data
162
163
    def decrypt(self, data):
164
        if self.is_encrypted:
165 1
            return self.Decryptor.decrypt(data)
166
        return data
167
168
    def remove_padding(self, data):
169
        if self.is_encrypted:
170
            pad_size = bytearray(data[-1:])[0] + 1
171
            return data[:-pad_size]
172 1
        return data
173
174 1
175
class SignerRsa(Signer):
176
177
    def __init__(self, client_pk):
178
        require_cryptography(self)
179 1
        self.client_pk = client_pk
180
        self.key_size = self.client_pk.key_size // 8
181
182 1
    def signature_size(self):
183
        return self.key_size
184
185
    def signature(self, data):
186 1
        return uacrypto.sign_sha1(self.client_pk, data)
187
188 1
189
class VerifierRsa(Verifier):
190
191
    def __init__(self, server_cert):
192
        require_cryptography(self)
193 1
        self.server_cert = server_cert
194
        self.key_size = self.server_cert.public_key().key_size // 8
195
196 1
    def signature_size(self):
197
        return self.key_size
198
199
    def verify(self, data, signature):
200 1
        uacrypto.verify_sha1(self.server_cert, data, signature)
201
202 1
203
class EncryptorRsa(Encryptor):
204
205
    def __init__(self, server_cert, enc_fn, padding_size):
206
        require_cryptography(self)
207
        self.server_cert = server_cert
208
        self.key_size = self.server_cert.public_key().key_size // 8
209 1
        self.encryptor = enc_fn
210
        self.padding_size = padding_size
211
212 1
    def plain_block_size(self):
213
        return self.key_size - self.padding_size
214
215 1
    def encrypted_block_size(self):
216
        return self.key_size
217
218
    def encrypt(self, data):
219
        encrypted = b''
220
        block_size = self.plain_block_size()
221
        for i in range(0, len(data), block_size):
222
            encrypted += self.encryptor(self.server_cert.public_key(),
223
                                        data[i: i + block_size])
224 1
        return encrypted
225
226 1
227
class DecryptorRsa(Decryptor):
228
229
    def __init__(self, client_pk, dec_fn, padding_size):
230
        require_cryptography(self)
231
        self.client_pk = client_pk
232
        self.key_size = self.client_pk.key_size // 8
233 1
        self.decryptor = dec_fn
234
        self.padding_size = padding_size
235
236 1
    def plain_block_size(self):
237
        return self.key_size - self.padding_size
238
239 1
    def encrypted_block_size(self):
240
        return self.key_size
241
242
    def decrypt(self, data):
243
        decrypted = b''
244
        block_size = self.encrypted_block_size()
245
        for i in range(0, len(data), block_size):
246
            decrypted += self.decryptor(self.client_pk,
247
                                        data[i: i + block_size])
248 1
        return decrypted
249
250 1
251
class SignerAesCbc(Signer):
252
253
    def __init__(self, key):
254 1
        require_cryptography(self)
255
        self.key = key
256
257 1
    def signature_size(self):
258
        return uacrypto.sha1_size()
259
260
    def signature(self, data):
261 1
        return uacrypto.hmac_sha1(self.key, data)
262
263 1
264
class VerifierAesCbc(Verifier):
265
266
    def __init__(self, key):
267 1
        require_cryptography(self)
268
        self.key = key
269
270 1
    def signature_size(self):
271
        return uacrypto.sha1_size()
272
273
    def verify(self, data, signature):
274
        expected = uacrypto.hmac_sha1(self.key, data)
275
        if signature != expected:
276 1
            raise uacrypto.InvalidSignature
277
278 1
279
class EncryptorAesCbc(Encryptor):
280
281
    def __init__(self, key, init_vec):
282 1
        require_cryptography(self)
283
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
284
285 1
    def plain_block_size(self):
286
        return self.cipher.algorithm.key_size // 8
287
288 1
    def encrypted_block_size(self):
289
        return self.cipher.algorithm.key_size // 8
290
291
    def encrypt(self, data):
292 1
        return uacrypto.cipher_encrypt(self.cipher, data)
293
294 1
295
class DecryptorAesCbc(Decryptor):
296
297
    def __init__(self, key, init_vec):
298 1
        require_cryptography(self)
299
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
300
301 1
    def plain_block_size(self):
302
        return self.cipher.algorithm.key_size // 8
303
304 1
    def encrypted_block_size(self):
305
        return self.cipher.algorithm.key_size // 8
306
307
    def decrypt(self, data):
308 1
        return uacrypto.cipher_decrypt(self.cipher, data)
309
310
311
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
312
    """
313
    Security Basic 128Rsa15
314
    A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
315
    and 128-Bit (16 bytes) for encryption algorithms.
316
    - SymmetricSignatureAlgorithm - HmacSha1
317
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
318
    - SymmetricEncryptionAlgorithm - Aes128
319
      (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
320
    - AsymmetricSignatureAlgorithm - RsaSha1
321
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
322
    - AsymmetricKeyWrapAlgorithm - KwRsa15
323
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
324
    - AsymmetricEncryptionAlgorithm - Rsa15
325
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
326
    - KeyDerivationAlgorithm - PSha1
327
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
328
    - DerivedSignatureKeyLength - 128 (16 bytes)
329
    - MinAsymmetricKeyLength - 1024 (128 bytes)
330
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
331
    - CertificateSignatureAlgorithm - Sha1
332
333
    If a certificate or any certificate in the chain is not signed with
334 1
    a hash that is Sha1 or stronger then the certificate shall be rejected.
335 1
    """
336 1
337 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
338
    signature_key_size = 16
339 1
    symmetric_key_size = 16
340
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
341
342
    @staticmethod
343 1
    def encrypt_asymmetric(pubkey, data):
344
        return uacrypto.encrypt_rsa15(pubkey, data)
345
346
    def __init__(self, server_cert, client_cert, client_pk, mode):
347
        require_cryptography(self)
348
        if isinstance(server_cert, bytes):
349
            server_cert = uacrypto.x509_from_der(server_cert)
350
        # even in Sign mode we need to asymmetrically encrypt secrets
351
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
352
        self.asymmetric_cryptography = Cryptography(
353
            MessageSecurityMode.SignAndEncrypt)
354
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
355
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
356
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
357
            server_cert, uacrypto.encrypt_rsa15, 11)
358
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
359
            client_pk, uacrypto.decrypt_rsa15, 11)
360
        self.symmetric_cryptography = Cryptography(mode)
361
        self.Mode = mode
362 1
        self.server_certificate = uacrypto.der_from_x509(server_cert)
363
        self.client_certificate = uacrypto.der_from_x509(client_cert)
364
365
    def make_symmetric_key(self, nonce1, nonce2):
366
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
367
368
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
369
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
370
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
371
372
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
373
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
374 1
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
375
376
377
class SecurityPolicyBasic256(SecurityPolicy):
378
    """
379
    Security Basic 256
380
    A suite of algorithms that are for 256-Bit (32 bytes) encryption,
381
    algorithms include:
382
    - SymmetricSignatureAlgorithm - HmacSha1
383
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
384
    - SymmetricEncryptionAlgorithm - Aes256
385
      (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
386
    - AsymmetricSignatureAlgorithm - RsaSha1
387
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
388
    - AsymmetricKeyWrapAlgorithm - KwRsaOaep
389
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
390
    - AsymmetricEncryptionAlgorithm - RsaOaep
391
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
392
    - KeyDerivationAlgorithm - PSha1
393
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
394
    - DerivedSignatureKeyLength - 192 (24 bytes)
395
    - MinAsymmetricKeyLength - 1024 (128 bytes)
396
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
397
    - CertificateSignatureAlgorithm - Sha1
398
399
    If a certificate or any certificate in the chain is not signed with
400 1
    a hash that is Sha1 or stronger then the certificate shall be rejected.
401 1
    """
402 1
403 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
404
    signature_key_size = 24
405 1
    symmetric_key_size = 32
406
    AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
407
408
    @staticmethod
409 1
    def encrypt_asymmetric(pubkey, data):
410
        return uacrypto.encrypt_rsa_oaep(pubkey, data)
411
412
    def __init__(self, server_cert, client_cert, client_pk, mode):
413
        require_cryptography(self)
414
        if isinstance(server_cert, bytes):
415
            server_cert = uacrypto.x509_from_der(server_cert)
416
        # even in Sign mode we need to asymmetrically encrypt secrets
417
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
418
        self.asymmetric_cryptography = Cryptography(
419
            MessageSecurityMode.SignAndEncrypt)
420
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
421
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
422
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
423
            server_cert, uacrypto.encrypt_rsa_oaep, 42)
424
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
425
            client_pk, uacrypto.decrypt_rsa_oaep, 42)
426
        self.symmetric_cryptography = Cryptography(mode)
427
        self.Mode = mode
428 1
        self.server_certificate = uacrypto.der_from_x509(server_cert)
429
        self.client_certificate = uacrypto.der_from_x509(client_cert)
430
431
    def make_symmetric_key(self, nonce1, nonce2):
432
        # specs part 6, 6.7.5
433
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
434
435
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
436
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
437
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
438
439
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
440
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
441 1
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
442
443
444
def encrypt_asymmetric(pubkey, data, policy_uri):
445
    """
446
    Encrypt data with pubkey using an asymmetric algorithm.
447
    The algorithm is selected by policy_uri.
448
    Returns a tuple (encrypted_data, algorithm_uri)
449
    """
450
    for cls in [SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
451
        if policy_uri == cls.URI:
452
            return (cls.encrypt_asymmetric(pubkey, data),
453
                    cls.AsymmetricEncryptionURI)
454
    if not policy_uri or policy_uri == POLICY_NONE_URI:
455
        return (data, '')
456
    raise UaError("Unsupported security policy `{}`".format(policy_uri))
457