Completed
Push — master ( 271a32...7dfda5 )
by Olivier
16:15
created

__init__()   A

Complexity

Conditions 2

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 5.1463
Metric Value
cc 2
dl 0
loc 18
ccs 1
cts 13
cp 0.0769
crap 5.1463
rs 9.4286
1 1
from opcua.ua import CryptographyNone, SecurityPolicy
2 1
from opcua.ua import MessageSecurityMode
3 1
from abc import ABCMeta, abstractmethod
4 1
try:
5 1
    from opcua.crypto import uacrypto
6
    CRYPTOGRAPHY_AVAILABLE = True
7 1
except ImportError:
8 1
    CRYPTOGRAPHY_AVAILABLE = False
9
10
11 1
def require_cryptography(obj):
12
    """
13
    Raise exception if cryptography module is not available.
14
    Call this function in constructors.
15
    """
16
    if not CRYPTOGRAPHY_AVAILABLE:
17
        raise Exception("Can't use {}, cryptography module is not installed"
18
                        .format(obj.__class__.__name__))
19
20
21 1
class Signer(object):
22
    """
23
    Abstract base class for cryptographic signature algorithm
24
    """
25
26 1
    __metaclass__ = ABCMeta
27
28 1
    @abstractmethod
29
    def signature_size(self):
30
        pass
31
32 1
    @abstractmethod
33
    def signature(self, data):
34
        pass
35
36
37 1
class Verifier(object):
38
    """
39
    Abstract base class for cryptographic signature verification
40
    """
41
42 1
    __metaclass__ = ABCMeta
43
44 1
    @abstractmethod
45
    def signature_size(self):
46
        pass
47
48 1
    @abstractmethod
49
    def verify(self, data, signature):
50
        pass
51
52
53 1
class Encryptor(object):
54
    """
55
    Abstract base class for encryption algorithm
56
    """
57
58 1
    __metaclass__ = ABCMeta
59
60 1
    @abstractmethod
61
    def plain_block_size(self):
62
        pass
63
64 1
    @abstractmethod
65
    def encrypted_block_size(self):
66
        pass
67
68 1
    @abstractmethod
69
    def encrypt(self, data):
70
        pass
71
72
73 1
class Decryptor(object):
74
    """
75
    Abstract base class for decryption algorithm
76
    """
77
78 1
    __metaclass__ = ABCMeta
79
80 1
    @abstractmethod
81
    def plain_block_size(self):
82
        pass
83
84 1
    @abstractmethod
85
    def encrypted_block_size(self):
86
        pass
87
88 1
    @abstractmethod
89
    def decrypt(self, data):
90
        pass
91
92
93 1
class Cryptography(CryptographyNone):
94
    """
95
    Security policy: Sign or SignAndEncrypt
96
    """
97 1
    def __init__(self, mode=MessageSecurityMode.Sign):
98
        self.Signer = None
99
        self.Verifier = None
100
        self.Encryptor = None
101
        self.Decryptor = None
102
        assert mode in (MessageSecurityMode.Sign,
103
                        MessageSecurityMode.SignAndEncrypt)
104
        self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
105
106 1
    def plain_block_size(self):
107
        """
108
        Size of plain text block for block cipher.
109
        """
110
        if self.is_encrypted:
111
            return self.Encryptor.plain_block_size()
112
        return 1
113
114 1
    def encrypted_block_size(self):
115
        """
116
        Size of encrypted text block for block cipher.
117
        """
118
        if self.is_encrypted:
119
            return self.Encryptor.encrypted_block_size()
120
        return 1
121
122 1
    def padding(self, size):
123
        """
124
        Create padding for a block of given size.
125
        plain_size = size + len(padding) + signature_size()
126
        plain_size = N * plain_block_size()
127
        """
128
        if not self.is_encrypted:
129
            return b''
130
        block_size = self.Encryptor.plain_block_size()
131
        rem = (size + self.signature_size() + 1) % block_size
132
        if rem != 0:
133
            rem = block_size - rem
134
        return bytes(bytearray([rem])) * (rem + 1)
135
136 1
    def min_padding_size(self):
137
        if self.is_encrypted:
138
            return 1
139
        return 0
140
141 1
    def signature_size(self):
142
        return self.Signer.signature_size()
143
144 1
    def signature(self, data):
145
        return self.Signer.signature(data)
146
147 1
    def vsignature_size(self):
148
        return self.Verifier.signature_size()
149
150 1
    def verify(self, data, sig):
151
        self.Verifier.verify(data, sig)
152
153 1
    def encrypt(self, data):
154
        if self.is_encrypted:
155
            assert len(data) % self.Encryptor.plain_block_size() == 0
156
            return self.Encryptor.encrypt(data)
157
        return data
158
159 1
    def decrypt(self, data):
160
        if self.is_encrypted:
161
            return self.Decryptor.decrypt(data)
162
        return data
163
164 1
    def remove_padding(self, data):
165
        if self.is_encrypted:
166
            pad_size = bytearray(data[-1:])[0] + 1
167
            return data[:-pad_size]
168
        return data
169
170
171 1
class SignerRsa(Signer):
172 1
    def __init__(self, client_pk):
173
        require_cryptography(self)
174
        self.client_pk = client_pk
175
        self.key_size = self.client_pk.key_size // 8
176
177 1
    def signature_size(self):
178
        return self.key_size
179
180 1
    def signature(self, data):
181
        return uacrypto.sign_sha1(self.client_pk, data)
182
183
184 1
class VerifierRsa(Verifier):
185 1
    def __init__(self, server_cert):
186
        require_cryptography(self)
187
        self.server_cert = server_cert
188
        self.key_size = self.server_cert.public_key().key_size // 8
189
190 1
    def signature_size(self):
191
        return self.key_size
192
193 1
    def verify(self, data, signature):
194
        uacrypto.verify_sha1(self.server_cert, data, signature)
195
196
197 1
class EncryptorRsa(Encryptor):
198 1
    def __init__(self, server_cert, enc_fn, padding_size):
199
        require_cryptography(self)
200
        self.server_cert = server_cert
201
        self.key_size = self.server_cert.public_key().key_size // 8
202
        self.encryptor = enc_fn
203
        self.padding_size = padding_size
204
205 1
    def plain_block_size(self):
206
        return self.key_size - self.padding_size
207
208 1
    def encrypted_block_size(self):
209
        return self.key_size
210
211 1
    def encrypt(self, data):
212
        encrypted = b''
213
        block_size = self.plain_block_size()
214
        for i in range(0, len(data), block_size):
215
            encrypted += self.encryptor(self.server_cert.public_key(),
216
                    data[i : i+block_size])
217
        return encrypted
218
219
220 1
class DecryptorRsa(Decryptor):
221 1
    def __init__(self, client_pk, dec_fn, padding_size):
222
        require_cryptography(self)
223
        self.client_pk = client_pk
224
        self.key_size = self.client_pk.key_size // 8
225
        self.decryptor = dec_fn
226
        self.padding_size = padding_size
227
228 1
    def plain_block_size(self):
229
        return self.key_size - self.padding_size
230
231 1
    def encrypted_block_size(self):
232
        return self.key_size
233
234 1
    def decrypt(self, data):
235
        decrypted = b''
236
        block_size = self.encrypted_block_size()
237
        for i in range(0, len(data), block_size):
238
            decrypted += self.decryptor(self.client_pk,
239
                    data[i : i+block_size])
240
        return decrypted
241
242
243 1
class SignerAesCbc(Signer):
244 1
    def __init__(self, key):
245
        require_cryptography(self)
246
        self.key = key
247
248 1
    def signature_size(self):
249
        return uacrypto.sha1_size()
250
251 1
    def signature(self, data):
252
        return uacrypto.hmac_sha1(self.key, data)
253
254
255 1
class VerifierAesCbc(Verifier):
256 1
    def __init__(self, key):
257
        require_cryptography(self)
258
        self.key = key
259
260 1
    def signature_size(self):
261
        return uacrypto.sha1_size()
262
263 1
    def verify(self, data, signature):
264
        expected = uacrypto.hmac_sha1(self.key, data)
265
        if signature != expected:
266
            raise uacrypto.InvalidSignature
267
268
269 1
class EncryptorAesCbc(Encryptor):
270 1
    def __init__(self, key, init_vec):
271
        require_cryptography(self)
272
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
273
274 1
    def plain_block_size(self):
275
        return self.cipher.algorithm.key_size // 8
276
277 1
    def encrypted_block_size(self):
278
        return self.cipher.algorithm.key_size // 8
279
280 1
    def encrypt(self, data):
281
        return uacrypto.cipher_encrypt(self.cipher, data)
282
283
284 1
class DecryptorAesCbc(Decryptor):
285 1
    def __init__(self, key, init_vec):
286
        require_cryptography(self)
287
        self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
288
289 1
    def plain_block_size(self):
290
        return self.cipher.algorithm.key_size // 8
291
292 1
    def encrypted_block_size(self):
293
        return self.cipher.algorithm.key_size // 8
294
295 1
    def decrypt(self, data):
296
        return uacrypto.cipher_decrypt(self.cipher, data)
297
298
299 1
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
300
    """
301
    Security Basic 128Rsa15
302
    A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
303
    and 128-Bit (16 bytes) for encryption algorithms.
304
    - SymmetricSignatureAlgorithm - HmacSha1
305
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
306
    - SymmetricEncryptionAlgorithm - Aes128
307
      (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
308
    - AsymmetricSignatureAlgorithm - RsaSha1
309
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
310
    - AsymmetricKeyWrapAlgorithm - KwRsa15
311
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
312
    - AsymmetricEncryptionAlgorithm - Rsa15
313
      (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
314
    - KeyDerivationAlgorithm - PSha1
315
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
316
    - DerivedSignatureKeyLength - 128 (16 bytes)
317
    - MinAsymmetricKeyLength - 1024 (128 bytes)
318
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
319
    - CertificateSignatureAlgorithm - Sha1
320
321
    If a certificate or any certificate in the chain is not signed with
322
    a hash that is Sha1 or stronger then the certificate shall be rejected.
323
    """
324
325 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
326 1
    signature_key_size = 16
327 1
    symmetric_key_size = 16
328
329 1
    def __init__(self, server_cert, client_cert, client_pk, mode):
330
        require_cryptography(self)
331
        if isinstance(server_cert, bytes):
332
            server_cert = uacrypto.x509_from_der(server_cert)
333
        # even in Sign mode we need to asymmetrically encrypt secrets
334
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
335
        self.asymmetric_cryptography = Cryptography(
336
                MessageSecurityMode.SignAndEncrypt)
337
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
338
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
339
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
340
                server_cert, uacrypto.encrypt_rsa15, 11)
341
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
342
                client_pk, uacrypto.decrypt_rsa15, 11)
343
        self.symmetric_cryptography = Cryptography(mode)
344
        self.Mode = mode
345
        self.server_certificate = uacrypto.der_from_x509(server_cert)
346
        self.client_certificate = uacrypto.der_from_x509(client_cert)
347
348 1
    def make_symmetric_key(self, nonce1, nonce2):
349
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
350
351
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
352
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
353
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
354
355
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
356
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
357
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
358
359
360 1
class SecurityPolicyBasic256(SecurityPolicy):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
361
    """
362
    Security Basic 256
363
    A suite of algorithms that are for 256-Bit (32 bytes) encryption,
364
    algorithms include:
365
    - SymmetricSignatureAlgorithm - HmacSha1
366
      (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
367
    - SymmetricEncryptionAlgorithm - Aes256
368
      (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
369
    - AsymmetricSignatureAlgorithm - RsaSha1
370
      (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
371
    - AsymmetricKeyWrapAlgorithm - KwRsaOaep
372
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
373
    - AsymmetricEncryptionAlgorithm - RsaOaep
374
      (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
375
    - KeyDerivationAlgorithm - PSha1
376
      (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
377
    - DerivedSignatureKeyLength - 192 (24 bytes)
378
    - MinAsymmetricKeyLength - 1024 (128 bytes)
379
    - MaxAsymmetricKeyLength - 2048 (256 bytes)
380
    - CertificateSignatureAlgorithm - Sha1
381
382
    If a certificate or any certificate in the chain is not signed with
383
    a hash that is Sha1 or stronger then the certificate shall be rejected.
384
    """
385
386 1
    URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
387 1
    signature_key_size = 24
388 1
    symmetric_key_size = 32
389
390 1
    def __init__(self, server_cert, client_cert, client_pk, mode):
391
        require_cryptography(self)
392
        if isinstance(server_cert, bytes):
393
            server_cert = uacrypto.x509_from_der(server_cert)
394
        # even in Sign mode we need to asymmetrically encrypt secrets
395
        # transmitted in OpenSecureChannel. So SignAndEncrypt here
396
        self.asymmetric_cryptography = Cryptography(
397
                MessageSecurityMode.SignAndEncrypt)
398
        self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
399
        self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
400
        self.asymmetric_cryptography.Encryptor = EncryptorRsa(
401
                server_cert, uacrypto.encrypt_rsa_oaep, 42)
402
        self.asymmetric_cryptography.Decryptor = DecryptorRsa(
403
                client_pk, uacrypto.decrypt_rsa_oaep, 42)
404
        self.symmetric_cryptography = Cryptography(mode)
405
        self.Mode = mode
406
        self.server_certificate = uacrypto.der_from_x509(server_cert)
407
        self.client_certificate = uacrypto.der_from_x509(client_cert)
408
409 1
    def make_symmetric_key(self, nonce1, nonce2):
410
        # specs part 6, 6.7.5
411
        key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
412
413
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
414
        self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
415
        self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
416
417
        (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
418
        self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
419
        self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
420