Completed
Push — master ( 49fbc0...e6a6db )
by Olivier
02:53
created

opcua.encrypt_rsa_oaep()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
dl 0
loc 9
ccs 0
cts 3
cp 0
rs 9.6667
cc 1
crap 2
1 1
import os
2
3 1
from cryptography import x509
4
from cryptography.exceptions import InvalidSignature
5
from cryptography.hazmat.backends import default_backend
6
from cryptography.hazmat.primitives import serialization
7
from cryptography.hazmat.primitives import hashes
8
from cryptography.hazmat.primitives import hmac
9
from cryptography.hazmat.primitives.asymmetric import padding
10
from cryptography.hazmat.primitives.ciphers import Cipher
11
from cryptography.hazmat.primitives.ciphers import algorithms
12
from cryptography.hazmat.primitives.ciphers import modes
13
14
15
def load_certificate(path):
16
    _, ext = os.path.splitext(path)
17
    with open(path, "br") as f:
18
        if ext == ".pem":
19
            return x509.load_pem_x509_certificate(f.read(), default_backend())
20
        else:
21
            return x509.load_der_x509_certificate(f.read(), default_backend())
22
23
24
def x509_from_der(data):
25
    if not data:
26
        return None
27
    return x509.load_der_x509_certificate(data, default_backend())
28
29
30
def x509_to_der(cert):
31
    if not data:
32
        return b''
33
    return cert.public_bytes(serialization.Encoding.DER)
34
35
36
def load_private_key(path):
37
    _, ext = os.path.splitext(path)
38
    with open(path, "br") as f:
39
        if ext == ".pem":
40
            return serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
41
        else:
42
            return serialization.load_der_private_key(f.read(), password=None, backend=default_backend())
43
44
45
def der_from_x509(certificate):
46
    if certificate is None:
47
        return b""
48
    return certificate.public_bytes(serialization.Encoding.DER)
49
50
51
def sign_sha1(private_key, data):
52
    signer = private_key.signer(
53
        padding.PKCS1v15(),
54
        hashes.SHA1()
55
    )
56
    signer.update(data)
57
    return signer.finalize()
58
59
60
def verify_sha1(certificate, data, signature):
61
    verifier = certificate.public_key().verifier(
62
        signature,
63
        padding.PKCS1v15(),
64
        hashes.SHA1())
65
    verifier.update(data)
66
    verifier.verify()
67
68
69
def encrypt_basic256(public_key, data):
70
    ciphertext = public_key.encrypt(
71
        data,
72
        padding.OAEP(
73
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
74
            algorithm=hashes.SHA256(),
75
            label=None)
76
    )
77
    return ciphertext
78
79
80
def encrypt_rsa_oaep(public_key, data):
81
    ciphertext = public_key.encrypt(
82
        data,
83
        padding.OAEP(
84
            mgf=padding.MGF1(algorithm=hashes.SHA1()),
85
            algorithm=hashes.SHA1(),
86
            label=None)
87
    )
88
    return ciphertext
89
90
91
def encrypt_rsa15(public_key, data):
92
    ciphertext = public_key.encrypt(
93
        data,
94
        padding.PKCS1v15()
95
    )
96
    return ciphertext
97
98
99
def decrypt_rsa_oaep(private_key, data):
100
    text = private_key.decrypt(
101
        data,
102
        padding.OAEP(
103
            mgf=padding.MGF1(algorithm=hashes.SHA1()),
104
            algorithm=hashes.SHA1(),
105
            label=None)
106
    )
107
    return text
108
109
110
def decrypt_rsa15(private_key, data):
111
    text = private_key.decrypt(
112
        data,
113
        padding.PKCS1v15()
114
    )
115
    return text
116
117
118
def cipher_aes_cbc(key, init_vec):
119
    return Cipher(algorithms.AES(key), modes.CBC(init_vec), default_backend())
120
121
122
def cipher_encrypt(cipher, data):
123
    encryptor = cipher.encryptor()
124
    return encryptor.update(data) + encryptor.finalize()
125
126
127
def cipher_decrypt(cipher, data):
128
    decryptor = cipher.decryptor()
129
    return decryptor.update(data) + decryptor.finalize()
130
131
132
def hash_hmac(key, message):
133
    hasher = hmac.HMAC(key, hashes.SHA1(), backend=default_backend())
134
    hasher.update(message)
135
    return hasher.finalize()
136
137
138
def sha1_size():
139
    return hashes.SHA1.digest_size
140
141
142
def p_sha1(key, body, sizes=()):
143
    """
144
    Derive one or more keys from key and body.
145
    Lengths of keys will match sizes argument
146
    """
147
    full_size = 0
148
    for size in sizes:
149
        full_size += size
150
151
    result = b''
152
    accum = body
153
    while len(result) < full_size:
154
        accum = hash_hmac(key, accum)
155
        result += hash_hmac(key, accum + body)
156
157
    parts = []
158
    for size in sizes:
159
        parts.append(result[:size])
160
        result = result[size:]
161
    return tuple(parts)
162
163
164
if __name__ == "__main__":
165
    # Convert from PEM to DER
166
    cert = load_certificate("../examples/server_cert.pem")
167
    #rsa_pubkey = pubkey_from_dercert(der)
168
    rsa_privkey = load_private_key("../examples/mykey.pem")
169
    
170
    from IPython import embed
171
    embed()
172