satcfdi.certifica._calculate_code()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 1
dl 0
loc 9
ccs 7
cts 7
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 1
import base64
2 1
import hashlib
3 1
import io
4 1
import os.path
5 1
from datetime import datetime
6 1
from random import randbytes
7
8 1
from cryptography.hazmat.primitives import hashes, serialization
9 1
from cryptography.hazmat.primitives.asymmetric import padding, rsa
10 1
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
11 1
from cryptography.hazmat.primitives.serialization import pkcs7
12
13 1
from .pkcs7 import create_pkcs7
14 1
from ..models import Signer, Certificate, RFC, RFCType
15 1
from ..ans1e import *
16 1
from ..models.certificate import CertificateType
17 1
from ..zip import zip_create, ZipData
18
19 1
ENCODING = 'windows-1252'
20
21
22 1
def _p_join(dirname, filename):
23 1
    if dirname is None:
24
        return filename
25 1
    os.makedirs(dirname, exist_ok=True)
26 1
    return os.path.join(dirname, filename)
27
28
29 1
class Certifica:
30 1
    def __init__(self, signer: Signer):
31 1
        self.signer = signer
32 1
        if self.signer.type != CertificateType.Fiel:
33
            raise ValueError("'signer' no es Fiel")
34
35 1
    def generacion_fiel(self, password: str, rfc: str, curp: str, correo: str = None, dirname: str = None):
36
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
37
        key_filename = f"Claveprivada_FIEL_{self.signer.rfc}_{tmf}.key"
38
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
39
40
        ren_filename = f"Renovacion_FIEL_{self.signer.rfc}_{tmf}.req"
41
        data = _create_generacion_certificate_signing_request(private_key=private_key, rfc=rfc, curp=curp, correo=correo)
42
        with open(_p_join(dirname, ren_filename), 'wb') as f:
43
            f.write(data)
44
45 1
    def renovacion_fiel(self, password: str, correo: str = None, dirname: str = None):
46
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
47
        key_filename = f"Claveprivada_FIEL_{self.signer.rfc}_{tmf}.key"
48
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
49
50
        ren_filename = f"Renovacion_FIEL_{self.signer.rfc}_{tmf}.ren"
51
        data = _create_renovation_certificate_signing_request(signer=self.signer, private_key=private_key, correo=correo)
52
        pck = self._pkcs7_package(data)
53
        with open(_p_join(dirname, ren_filename), 'wb') as f:
54
            f.write(pck)
55
56 1
    def renovacion_fiel_moral(self, password: str, rfc: str, correo: str, dirname: str = None):
57
        rfc = RFC(rfc)
58
        if rfc.type != RFCType.MORAL:
59
            raise ValueError("RFC no es Moral")
60
        rfc = str(rfc)
61
62
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
63
        key_filename = f"Claveprivada_FIEL_{rfc}_{tmf}.key"
64
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
65
66
        ren_filename = f"Renovacion_FIEL_{rfc}_{tmf}.ren"
67
        data = _create_renovation_moral_certificate_signing_request(
68
            signer=self.signer,
69
            private_key=private_key,
70
            rfc=rfc,
71
            correo=correo
72
        )
73
        pck = self._pkcs7_package(data)
74
        with open(_p_join(dirname, ren_filename), 'wb') as f:
75
            f.write(pck)
76
77 1
    def solicitud_certificado(self, sucursal: str, password: str, dirname: str = None):
78 1
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
79 1
        key_filename = f"CSD_{sucursal.replace(' ', '_')}_{self.signer.rfc}_{tmf}.key"
80 1
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
81
82 1
        sdg_filename = f"CSD_{self.signer.rfc}_{tmf}.sdg"
83 1
        data = _create_certificate_signing_request_zip(self.signer, private_key, sucursal)
84 1
        pck = self._pkcs7_package(data)
85 1
        with open(_p_join(dirname, sdg_filename), 'wb') as f:
86 1
            f.write(pck)
87
88 1
    @staticmethod
89 1
    def _generate_key(key_filename, password):
90 1
        private_key = rsa.generate_private_key(
91
            public_exponent=65537,
92
            key_size=2048,
93
        )
94
95 1
        p_key = private_key.private_bytes(
96
            encoding=serialization.Encoding.DER,
97
            format=serialization.PrivateFormat.PKCS8,
98
            encryption_algorithm=serialization.BestAvailableEncryption(password.encode())
99
        )
100
101 1
        with open(key_filename, 'wb') as f:
102 1
            f.write(p_key)
103
104 1
        return private_key
105
106 1
    def _pkcs7_package(self, data):
107 1
        return create_pkcs7(
108
            data=data,
109
            signer=self.signer,
110
            hash_algorithm=hashes.SHA1(),
111
        )
112
113
114 1
def _create_certificate_signing_request_zip(signer: Signer, private_key: RSAPrivateKey, sucursal: str):
115 1
    res = _create_certificate_signing_request(signer=signer, private_key=private_key, sucursal=sucursal)
116
117 1
    tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
118 1
    tmp_filename = f"CSD_{sucursal.replace(' ', '_')}_{signer.rfc}_{tmf}s.req"
119
120 1
    with io.BytesIO() as b:
121 1
        zip_create(b, files=[ZipData(tmp_filename, lambda s: s.write(res))])
122 1
        return b.getvalue()
123
124
125 1
def _create_generacion_certificate_signing_request(private_key: RSAPrivateKey, rfc: str, curp: str, correo: str):
126 1
    code = _calculate_code_random()
127
128 1
    subject = {
129
        '2.5.4.45': (rfc.encode(ENCODING), Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
130
        '2.5.4.5': (curp.encode(ENCODING), Numbers.PrintableString),
131
        '1.2.840.113549.1.9.1': (correo, Numbers.IA5String)
132
    }
133
134 1
    return _certificate_request(private_key, subject, code)
135
136
137 1
def _create_renovation_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, correo):
138 1
    code = _calculate_code_random()
139 1
    cer = signer.certificate
140 1
    subject_at = {
141
        k: v for k, v in cer.get_subject().get_components()
142
    }
143
144 1
    subject = {
145
        '2.5.4.45': (subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
146
        '2.5.4.5': (subject_at[b'serialNumber'], Numbers.PrintableString),
147
        '1.2.840.113549.1.9.1': (correo or subject_at[b'emailAddress'], Numbers.IA5String)
148
    }
149 1
    return _certificate_request(private_key, subject, code)
150
151
152 1
def _create_renovation_moral_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, rfc, correo):
153 1
    code = _calculate_code_random()
154 1
    cer = signer.certificate
155 1
    subject_at = {
156
        k: v for k, v in cer.get_subject().get_components()
157
    }
158
159 1
    subject = {
160
        '2.5.4.45': (rfc.encode(ENCODING) + b' / ' + subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
161
        '2.5.4.5': (b' / ' + subject_at[b'serialNumber'], Numbers.PrintableString),
162
        '1.2.840.113549.1.9.1': (correo, Numbers.IA5String)
163
    }
164 1
    return _certificate_request(private_key, subject, code)
165
166
167 1
def _create_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, sucursal: str):
168 1
    code = _calculate_code(signer)
169 1
    cer = signer.certificate
170 1
    subject_at = {
171
        k: v for k, v in cer.get_subject().get_components()
172
    }
173
174 1
    subject = {
175
        '2.5.4.45': (subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
176
        '2.5.4.5': (subject_at[b'serialNumber'], Numbers.PrintableString)
177
    }
178 1
    if RFC(signer.rfc).type == RFCType.MORAL:
179 1
        subject['2.5.4.10'] = (subject_at[b'O'].decode(ENCODING), None)
180
    else:
181 1
        subject['2.5.4.3'] = (subject_at[b'CN'].decode(ENCODING), None)
182 1
    subject['2.5.4.11'] = (sucursal, None)
183
184 1
    return _certificate_request(private_key, subject, code)
185
186
187 1
def _certificate_request(private_key: RSAPrivateKey, subject: dict, code: bytes):
188 1
    public_key_bytes = private_key.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1)
189
190 1
    e = Ans1Encoder()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Ans1Encoder does not seem to be defined.
Loading history...
191 1
    with e.seq():
192 1
        e(0)
193 1
        with e.seq():
194 1
            for k, (v, nr) in subject.items():
195 1
                with e.set():
196 1
                    with e.seq():
197 1
                        e.oid(k)
198 1
                        e(v, nr=nr)
199 1
        with e.seq():
200 1
            with e.seq():
201 1
                e.oid('1.2.840.113549.1.1.1')
202 1
                e()
203 1
            e(public_key_bytes, nr=Numbers.BitString)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
204 1
        with e.enter(nr=0, cls=Classes.Context):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Classes does not seem to be defined.
Loading history...
205 1
            with e.seq():
206 1
                e.oid('1.2.840.113549.1.9.7')
207 1
                with e.set():
208 1
                    e(code, nr=Numbers.PrintableString)
209 1
    cert_request_bytes = e.output()
210
211 1
    e = Ans1Encoder()
212 1
    with e.seq():
213 1
        e.write(cert_request_bytes)
214 1
        with e.seq():
215 1
            e.oid('1.2.840.113549.1.1.5')
216 1
            e()
217 1
        e(
218
            private_key.sign(data=cert_request_bytes, padding=padding.PKCS1v15(), algorithm=hashes.SHA1()),
219
            nr=Numbers.BitString
220
        )
221 1
    return e.output()
222
223
224 1
def _calculate_code(certificate: Certificate):
225 1
    ui = next(v for k, v in certificate.certificate.get_subject().get_components() if k == b'x500UniqueIdentifier')
226
227 1
    def digest(value):
228 1
        m = hashlib.sha1()
229 1
        m.update(value)
230 1
        return base64.b64encode(m.digest())
231
232 1
    return digest(ui + digest(ui + ui))
233
234
235 1
def _calculate_code_random():
236
    return base64.b64encode(randbytes(20))
237