Passed
Push — main ( 66fd70...bee5a0 )
by Sat CFDI
04:37
created

satcfdi.certifica._p_join()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 5
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1 1
import base64
2 1
import hashlib
3 1
import io
4 1
import os.path
5 1
from datetime import datetime
6 1
from random import randbytes
7
8 1
from cryptography.hazmat.primitives import hashes, serialization
9 1
from cryptography.hazmat.primitives.asymmetric import padding, rsa
10 1
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
11 1
from cryptography.hazmat.primitives.serialization import pkcs7
12
13 1
from ..models import Signer, Certificate, RFC, RFCType
14 1
from ..ans1e import *
15 1
from ..models.certificate import CertificateType
16 1
from ..zip import zip_create, ZipData
17
18 1
ENCODING = 'windows-1252'
19
20
21 1
def _p_join(dirname, filename):
22 1
    if dirname is None:
23
        return filename
24 1
    os.makedirs(dirname, exist_ok=True)
25 1
    return os.path.join(dirname, filename)
26
27
28 1
class Certifica:
29 1
    def __init__(self, signer: Signer):
30 1
        self.signer = signer
31 1
        if self.signer.type != CertificateType.Fiel:
32
            raise ValueError("'signer' no es Fiel")
33
34 1
    def generacion_fiel(self, password: str, rfc: str, curp: str, correo: str = None, dirname: str = None):
35
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
36
        key_filename = f"Claveprivada_FIEL_{self.signer.rfc}_{tmf}.key"
37
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
38
39
        ren_filename = f"Renovacion_FIEL_{self.signer.rfc}_{tmf}.req"
40
        data = _create_generacion_certificate_signing_request(private_key=private_key, rfc=rfc, curp=curp, correo=correo)
41
        with open(_p_join(dirname, ren_filename), 'wb') as f:
42
            f.write(data)
43
44 1
    def renovacion_fiel(self, password: str, correo: str = None, dirname: str = None):
45
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
46
        key_filename = f"Claveprivada_FIEL_{self.signer.rfc}_{tmf}.key"
47
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
48
49
        ren_filename = f"Renovacion_FIEL_{self.signer.rfc}_{tmf}.ren"
50
        data = _create_renovation_certificate_signing_request(signer=self.signer, private_key=private_key, correo=correo)
51
        pck = self._pkcs7_package(data)
52
        with open(_p_join(dirname, ren_filename), 'wb') as f:
53
            f.write(pck)
54
55 1
    def renovacion_fiel_moral(self, password: str, rfc: str, correo: str, dirname: str = None):
56
        rfc = RFC(rfc)
57
        if rfc.type != RFCType.MORAL:
58
            raise ValueError("RFC no es Moral")
59
        rfc = str(rfc)
60
61
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
62
        key_filename = f"Claveprivada_FIEL_{rfc}_{tmf}.key"
63
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
64
65
        ren_filename = f"Renovacion_FIEL_{rfc}_{tmf}.ren"
66
        data = _create_renovation_moral_certificate_signing_request(
67
            signer=self.signer,
68
            private_key=private_key,
69
            rfc=rfc,
70
            correo=correo
71
        )
72
        pck = self._pkcs7_package(data)
73
        with open(_p_join(dirname, ren_filename), 'wb') as f:
74
            f.write(pck)
75
76 1
    def solicitud_certificado(self, sucursal: str, password: str, dirname: str = None):
77 1
        tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
78 1
        key_filename = f"CSD_{sucursal.replace(' ', '_')}_{self.signer.rfc}_{tmf}.key"
79 1
        private_key = self._generate_key(_p_join(dirname, key_filename), password)
80
81 1
        sdg_filename = f"CSD_{self.signer.rfc}_{tmf}.sdg"
82 1
        data = _create_certificate_signing_request_zip(self.signer, private_key, sucursal)
83 1
        pck = self._pkcs7_package(data)
84 1
        with open(_p_join(dirname, sdg_filename), 'wb') as f:
85 1
            f.write(pck)
86
87 1
    @staticmethod
88 1
    def _generate_key(key_filename, password):
89 1
        private_key = rsa.generate_private_key(
90
            public_exponent=65537,
91
            key_size=2048,
92
        )
93
94 1
        p_key = private_key.private_bytes(
95
            encoding=serialization.Encoding.DER,
96
            format=serialization.PrivateFormat.PKCS8,
97
            encryption_algorithm=serialization.BestAvailableEncryption(password.encode())
98
        )
99
100 1
        with open(key_filename, 'wb') as f:
101 1
            f.write(p_key)
102
103 1
        return private_key
104
105 1
    def _pkcs7_package(self, data):
106 1
        cert = self.signer.certificate.to_cryptography()
107 1
        key = self.signer.key
108 1
        options = [pkcs7.PKCS7Options.NoCapabilities, pkcs7.PKCS7Options.Binary]
109
110 1
        return pkcs7.PKCS7SignatureBuilder().set_data(data) \
111
            .add_signer(cert, key, hashes.SHA1()) \
112
            .sign(serialization.Encoding.DER, options)
113
114
115 1
def _create_certificate_signing_request_zip(signer: Signer, private_key: RSAPrivateKey, sucursal: str):
116 1
    res = _create_certificate_signing_request(signer=signer, private_key=private_key, sucursal=sucursal)
117
118 1
    tmf = datetime.now().strftime('%Y%m%d_%H%M%S')
119 1
    tmp_filename = f"CSD_{sucursal.replace(' ', '_')}_{signer.rfc}_{tmf}s.req"
120
121 1
    with io.BytesIO() as b:
122 1
        zip_create(b, files=[ZipData(tmp_filename, lambda s: s.write(res))])
123 1
        return b.getvalue()
124
125
126 1
def _create_generacion_certificate_signing_request(private_key: RSAPrivateKey, rfc: str, curp: str, correo: str):
127 1
    code = _calculate_code_random()
128
129 1
    subject = {
130
        '2.5.4.45': (rfc.encode(ENCODING), Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
131
        '2.5.4.5': (curp.encode(ENCODING), Numbers.PrintableString),
132
        '1.2.840.113549.1.9.1': (correo, Numbers.IA5String)
133
    }
134
135 1
    return _certificate_request(private_key, subject, code)
136
137
138 1
def _create_renovation_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, correo):
139 1
    code = _calculate_code_random()
140 1
    cer = signer.certificate
141 1
    subject_at = {
142
        k: v for k, v in cer.get_subject().get_components()
143
    }
144
145 1
    subject = {
146
        '2.5.4.45': (subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
147
        '2.5.4.5': (subject_at[b'serialNumber'], Numbers.PrintableString),
148
        '1.2.840.113549.1.9.1': (correo or subject_at[b'emailAddress'], Numbers.IA5String)
149
    }
150 1
    return _certificate_request(private_key, subject, code)
151
152
153 1
def _create_renovation_moral_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, rfc, correo):
154 1
    code = _calculate_code_random()
155 1
    cer = signer.certificate
156 1
    subject_at = {
157
        k: v for k, v in cer.get_subject().get_components()
158
    }
159
160 1
    subject = {
161
        '2.5.4.45': (rfc.encode(ENCODING) + b' / ' + subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
162
        '2.5.4.5': (b' / ' + subject_at[b'serialNumber'], Numbers.PrintableString),
163
        '1.2.840.113549.1.9.1': (correo, Numbers.IA5String)
164
    }
165 1
    return _certificate_request(private_key, subject, code)
166
167
168 1
def _create_certificate_signing_request(signer: Signer, private_key: RSAPrivateKey, sucursal: str):
169 1
    code = _calculate_code(signer)
170 1
    cer = signer.certificate
171 1
    subject_at = {
172
        k: v for k, v in cer.get_subject().get_components()
173
    }
174
175 1
    subject = {
176
        '2.5.4.45': (subject_at[b'x500UniqueIdentifier'], Numbers.PrintableString),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
177
        '2.5.4.5': (subject_at[b'serialNumber'], Numbers.PrintableString)
178
    }
179 1
    if RFC(signer.rfc).type == RFCType.MORAL:
180 1
        subject['2.5.4.10'] = (subject_at[b'O'].decode(ENCODING), None)
181
    else:
182 1
        subject['2.5.4.3'] = (subject_at[b'CN'].decode(ENCODING), None)
183 1
    subject['2.5.4.11'] = (sucursal, None)
184
185 1
    return _certificate_request(private_key, subject, code)
186
187
188 1
def _certificate_request(private_key: RSAPrivateKey, subject: dict, code: bytes):
189 1
    public_key_bytes = private_key.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1)
190
191 1
    e = Ans1Encoder()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Ans1Encoder does not seem to be defined.
Loading history...
192 1
    with e.seq():
193 1
        e(0)
194 1
        with e.seq():
195 1
            for k, (v, nr) in subject.items():
196 1
                with e.set():
197 1
                    with e.seq():
198 1
                        e.oid(k)
199 1
                        e(v, nr=nr)
200 1
        with e.seq():
201 1
            with e.seq():
202 1
                e.oid('1.2.840.113549.1.1.1')
203 1
                e()
204 1
            e(public_key_bytes, nr=Numbers.BitString)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Numbers does not seem to be defined.
Loading history...
205 1
        with e.enter(nr=0, cls=Classes.Context):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Classes does not seem to be defined.
Loading history...
206 1
            with e.seq():
207 1
                e.oid('1.2.840.113549.1.9.7')
208 1
                with e.set():
209 1
                    e(code, nr=Numbers.PrintableString)
210 1
    cert_request_bytes = e.output()
211
212 1
    e = Ans1Encoder()
213 1
    with e.seq():
214 1
        e.write(cert_request_bytes)
215 1
        with e.seq():
216 1
            e.oid('1.2.840.113549.1.1.5')
217 1
            e()
218 1
        e(
219
            private_key.sign(data=cert_request_bytes, padding=padding.PKCS1v15(), algorithm=hashes.SHA1()),
220
            nr=Numbers.BitString
221
        )
222 1
    return e.output()
223
224
225 1
def _calculate_code(certificate: Certificate):
226 1
    ui = next(v for k, v in certificate.certificate.get_subject().get_components() if k == b'x500UniqueIdentifier')
227
228 1
    def digest(value):
229 1
        m = hashlib.sha1()
230 1
        m.update(value)
231 1
        return base64.b64encode(m.digest())
232
233 1
    return digest(ui + digest(ui + ui))
234
235
236 1
def _calculate_code_random():
237
    return base64.b64encode(randbytes(20))
238