Completed
Push — master ( 271a32...7dfda5 )
by Olivier
16:15
created

opcua.crypto.x509_name_to_string()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6
Metric Value
cc 2
dl 0
loc 3
ccs 0
cts 3
cp 0
crap 6
rs 10
1 1
import os
2
3 1
from cryptography import x509
4
from cryptography.exceptions import InvalidSignature
5
from cryptography.hazmat.backends import default_backend
6
from cryptography.hazmat.primitives import serialization
7
from cryptography.hazmat.primitives import hashes
8
from cryptography.hazmat.primitives import hmac
9
from cryptography.hazmat.primitives.asymmetric import padding
10
from cryptography.hazmat.primitives.ciphers import Cipher
11
from cryptography.hazmat.primitives.ciphers import algorithms
12
from cryptography.hazmat.primitives.ciphers import modes
13
14
15
def load_certificate(path):
16
    _, ext = os.path.splitext(path)
17
    with open(path, "rb") as f:
18
        if ext == ".pem":
19
            return x509.load_pem_x509_certificate(f.read(), default_backend())
20
        else:
21
            return x509.load_der_x509_certificate(f.read(), default_backend())
22
23
24
def x509_from_der(data):
25
    if not data:
26
        return None
27
    return x509.load_der_x509_certificate(data, default_backend())
28
29
30
def load_private_key(path):
31
    _, ext = os.path.splitext(path)
32
    with open(path, "rb") as f:
33
        if ext == ".pem":
34
            return serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
35
        else:
36
            return serialization.load_der_private_key(f.read(), password=None, backend=default_backend())
37
38
39
def der_from_x509(certificate):
40
    if certificate is None:
41
        return b""
42
    return certificate.public_bytes(serialization.Encoding.DER)
43
44
45
def sign_sha1(private_key, data):
46
    signer = private_key.signer(
47
        padding.PKCS1v15(),
48
        hashes.SHA1()
49
    )
50
    signer.update(data)
51
    return signer.finalize()
52
53
54
def verify_sha1(certificate, data, signature):
55
    verifier = certificate.public_key().verifier(
56
        signature,
57
        padding.PKCS1v15(),
58
        hashes.SHA1())
59
    verifier.update(data)
60
    verifier.verify()
61
62
63
def encrypt_basic256(public_key, data):
64
    ciphertext = public_key.encrypt(
65
        data,
66
        padding.OAEP(
67
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
68
            algorithm=hashes.SHA256(),
69
            label=None)
70
    )
71
    return ciphertext
72
73
74
def encrypt_rsa_oaep(public_key, data):
75
    ciphertext = public_key.encrypt(
76
        data,
77
        padding.OAEP(
78
            mgf=padding.MGF1(algorithm=hashes.SHA1()),
79
            algorithm=hashes.SHA1(),
80
            label=None)
81
    )
82
    return ciphertext
83
84
85
def encrypt_rsa15(public_key, data):
86
    ciphertext = public_key.encrypt(
87
        data,
88
        padding.PKCS1v15()
89
    )
90
    return ciphertext
91
92
93
def decrypt_rsa_oaep(private_key, data):
94
    text = private_key.decrypt(
95
        data,
96
        padding.OAEP(
97
            mgf=padding.MGF1(algorithm=hashes.SHA1()),
98
            algorithm=hashes.SHA1(),
99
            label=None)
100
    )
101
    return text
102
103
104
def decrypt_rsa15(private_key, data):
105
    text = private_key.decrypt(
106
        data,
107
        padding.PKCS1v15()
108
    )
109
    return text
110
111
112
def cipher_aes_cbc(key, init_vec):
113
    return Cipher(algorithms.AES(key), modes.CBC(init_vec), default_backend())
114
115
116
def cipher_encrypt(cipher, data):
117
    encryptor = cipher.encryptor()
118
    return encryptor.update(data) + encryptor.finalize()
119
120
121
def cipher_decrypt(cipher, data):
122
    decryptor = cipher.decryptor()
123
    return decryptor.update(data) + decryptor.finalize()
124
125
126
def hmac_sha1(key, message):
127
    hasher = hmac.HMAC(key, hashes.SHA1(), backend=default_backend())
128
    hasher.update(message)
129
    return hasher.finalize()
130
131
132
def sha1_size():
133
    return hashes.SHA1.digest_size
134
135
136
def p_sha1(secret, seed, sizes=()):
137
    """
138
    Derive one or more keys from secret and seed.
139
    (See specs part 6, 6.7.5 and RFC 2246 - TLS v1.0)
140
    Lengths of keys will match sizes argument
141
    """
142
    full_size = 0
143
    for size in sizes:
144
        full_size += size
145
146
    result = b''
147
    accum = seed
148
    while len(result) < full_size:
149
        accum = hmac_sha1(secret, accum)
150
        result += hmac_sha1(secret, accum + seed)
151
152
    parts = []
153
    for size in sizes:
154
        parts.append(result[:size])
155
        result = result[size:]
156
    return tuple(parts)
157
158
159
def x509_name_to_string(name):
160
    parts = ["{}={}".format(attr.oid._name, attr.value) for attr in name]
161
    return ', '.join(parts)
162
163
164
def x509_to_string(cert):
165
    """
166
    Convert x509 certificate to human-readable string
167
    """
168
    if cert.subject == cert.issuer:
169
        issuer = ' (self-signed)'
170
    else:
171
        issuer = ', issuer: {}'.format(x509_name_to_string(cert.issuer))
172
    # TODO: show more information
173
    return "{}{}, {} - {}".format(x509_name_to_string(cert.subject), issuer, cert.not_valid_before, cert.not_valid_after)
174
175
176
if __name__ == "__main__":
177
    # Convert from PEM to DER
178
    cert = load_certificate("../examples/server_cert.pem")
179
    #rsa_pubkey = pubkey_from_dercert(der)
180
    rsa_privkey = load_private_key("../examples/mykey.pem")
181
    
182
    from IPython import embed
183
    embed()
184