Completed
Push — master ( 07aed8...cec7a8 )
by Olivier
02:28
created

opcua.sign_sha1()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.5186
Metric Value
dl 0
loc 7
ccs 1
cts 7
cp 0.1429
rs 9.4286
cc 2
crap 4.5186
1 1
from Crypto.Util.asn1 import DerSequence
2 1
from ssl import PEM_cert_to_DER_cert
3 1
from ssl import DER_cert_to_PEM_cert
4 1
import base64
5 1
import hashlib
6
7
8 1
from Crypto.Signature import PKCS1_v1_5
9 1
from Crypto.Hash import SHA256
10 1
from Crypto.Hash import SHA
11 1
from Crypto.PublicKey import RSA
12 1
from Crypto.Cipher import AES
13 1
from Crypto.Cipher import PKCS1_OAEP
14 1
from Crypto import Hash 
15 1
from Crypto import Random
16
17 1
BS = 16
18 1
pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS) 
19 1
unpad = lambda s: s[:-ord(s[len(s) - 1:])]
20
21
22 1
def dem_to_der(data):
23
    """
24
    ssh.PEM_cert_to_DER_cert seems to have issues with python3 bytes, so we wrap it
25
    """
26
    data = PEM_cert_to_DER_cert(data.decode("utf8"))
27
    return data
28
29
30 1
def encrypt_aes(key, raw):
31
    #key = hashlib.sha256(key.encode()).digest()
32
    key = key.exportKey(format="DER")
33
    raw = pad(raw)
34
    iv = Random.new().read(AES.block_size)
35
    cipher = AES.new(key, AES.MODE_CBC, iv)
36
    return base64.b64encode(iv + cipher.encrypt(raw)) 
37
38
39 1
def decrypt_aes(key, enc):
40
    enc = base64.b64decode(enc)
41
    iv = enc[:16]
42
    cipher = AES.new(key, AES.MODE_CBC, iv)
43
    return unpad(cipher.decrypt(enc[16:]))
44
45
46 1
def encrypt_rsa_oaep(privkey, data):
47
    if not type(privkey) is RSA._RSAobj:
48
        privkey = RSA.importKey(privkey)
49
    cipher = PKCS1_OAEP.new(privkey, Hash.SHA256)
50
    #cipher = PKCS1_OAEP.new(privkey, Hash.SHA)
51
    ciphertext = cipher.encrypt(data)
52
    return ciphertext
53
54
55 1
def pubkey_from_dercert(der):
56
    cert = DerSequence()
57
    cert.decode(der)
58
    tbsCertificate = DerSequence()
59
    tbsCertificate.decode(cert[0])
60
    subjectPublicKeyInfo = tbsCertificate[6]
61
62
    # Initialize RSA key
63
    rsa_key = RSA.importKey(subjectPublicKeyInfo)
64
    return rsa_key
65
66
67 1
def sign_sha256(key, data):
68
    if not type(key) is RSA._RSAobj:
69
        key = RSA.importKey(key)
70
    myhash = SHA256.new(data).digest()
71
    signature = key.sign(myhash, '')
72
    return signature
73
74
75 1
def sign_sha1(key, data):
76
    if not type(key) is RSA._RSAobj:
77
        key = RSA.importKey(key)
78
    myhash = SHA.new(data)
79
    signer = PKCS1_v1_5.new(key)
80
    signature = signer.sign(myhash)
81
    return signature
82
83
    
84
85
86 1
if __name__ == "__main__":
87
    import OpenSSL
88
    # Convert from PEM to DER
89
    pem = open("../examples/server_cert.pem").read()
90
    der = PEM_cert_to_DER_cert(pem)
91
    rsa_pubkey = pubkey_from_dercert(der)
92
    priv_pem = open("../examples/mykey.pem").read()
93
    rsa_privkey = RSA.importKey(priv_pem)
94
    #lines = pem.replace(" ",'').split()
95
    #der = a2b_base64(''.join(lines[1:-1]))
96
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
97
98
    
99
    # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
100
    #cert = DerSequence()
101
    #cert.decode(der)
102
    #tbsCertificate = DerSequence()
103
    #tbsCertificate.decode(cert[0])
104
    #subjectPublicKeyInfo = tbsCertificate[6]
105
106
    # Initialize RSA key
107
    #rsa_key = RSA.importKey(subjectPublicKeyInfo)
108
    print("Pub Key", rsa_pubkey)
109
    print("Priv Key", rsa_privkey)
110
    msg = encrypt256(rsa_privkey, b"this is my message")
111
    print("Encrypted data: ", msg)
112
    from IPython import embed
113
    embed()
114