Passed
Push — main ( bbe170...cf8f99 )
by Sat CFDI
05:27
created

satcfdi.portal.SSLAdapter.proxy_manager_for()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 3
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
c 0
b 0
f 0
1 1
import json
2 1
import pickle
3 1
from time import time
4
5 1
import requests
6 1
from requests.adapters import HTTPAdapter
7 1
from urllib3.util.ssl_ import create_urllib3_context
8 1
from requests.structures import CaseInsensitiveDict
9
10 1
from .utils import get_form, generate_token, request_ref_headers, request_verification_token, random_ajax_id
11 1
from ..models import Signer
12 1
from ..exceptions import ResponseError
13
14
15
# Ciphers compatible with SAT Services
16 1
CIPHERS = (
17
    'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:ECDH+AESGCM:'
18
    'DH+AESGCM:ECDH+AES:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!eNULL:!MD5:!DSS'
19
    ':HIGH:!DH'
20
)
21
22
23 1
class SSLAdapter(HTTPAdapter):
24 1
    def init_poolmanager(self, *args, **kwargs):
25 1
        kwargs['ssl_context'] = create_urllib3_context(ciphers=CIPHERS)
26 1
        return super().init_poolmanager(*args, **kwargs)
27
28 1
    def proxy_manager_for(self, *args, **kwargs):
29
        kwargs['ssl_context'] = create_urllib3_context(ciphers=CIPHERS)
30
        return super().proxy_manager_for(*args, **kwargs)
31
32
33 1
class PortalManager(requests.Session):
34 1
    def __init__(self, signer: Signer):
35 1
        super().__init__()
36 1
        self.mount('https://', SSLAdapter())
37 1
        self.signer = signer
38
39 1
        self.headers = CaseInsensitiveDict(
40
            {
41
                "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
42
                "Accept-Encoding": 'gzip, deflate, br',
43
                "Accept": 'text/html,application/xhtml+xml,application/xml',
44
                "Connection": "keep-alive",
45
                'Pragma': 'no-cache',
46
                'Cache-Control': 'no-cache',
47
            }
48
        )
49
50 1
    def save_session(self, target):
51 1
        pickle.dump(self.cookies, target)
52
53 1
    def load_session(self, source):
54
        self.cookies.update(pickle.load(source))
55
56 1
    def form(self, action, referer_url, data):
57
        res = self.post(
58
            url=action,
59
            headers=request_ref_headers(referer_url),
60
            data=data
61
        )
62
        assert res.status_code == 200
63
        return res
64
65 1
    def fiel_login(self, login_response):
66
        action, data = get_form(login_response, id='certform')
67
        return self.form(
68
            action,
69
            login_response.request.url,
70
            data | {
71
                'token': generate_token(self.signer, code=data['guid']),
72
                'fert': self.signer.certificate.get_notAfter()[2:].decode(),
73
            }
74
        )
75
76
77 1
class SATPortal(PortalManager):
78 1
    BASE_URL = 'https://loginda.siat.sat.gob.mx'
79
80 1
    def login(self):
81
        res = self.get(
82
            url=f'{self.BASE_URL}/nidp/app/login?id=fiel'
83
        )
84
        assert res.status_code == 200
85
        action, data = get_form(res)
86
87
        return self.fiel_login(
88
            login_response=self.form(action, res.request.url, data)
89
        )
90
91 1
    def home_page(self):
92
        return self.get(
93
            url=f'{self.BASE_URL}/nidp/app?sid=0'
94
        )
95
96 1
    def logout(self):
97
        return self.get(
98
            url=f'{self.BASE_URL}/nidp/app/logout',
99
            headers={
100
                'referer': f'{self.BASE_URL}/nidp/app?sid=0'
101
            },
102
            allow_redirects=False
103
        )
104
105 1
    def declaraciones_provisionales_login(self):
106
        res = self.get(
107
            url='https://ptscdecprov.clouda.sat.gob.mx',
108
            allow_redirects=True
109
        )
110
        assert res.status_code == 200
111
112
        action, data = get_form(res)
113
        res = self.form(action, res.request.url, data)
114
        return res
115
116
117 1
class SATFacturaElectronica(PortalManager):
118 1
    BASE_URL = 'https://portal.facturaelectronica.sat.gob.mx'
119 1
    REQUEST_CONTEXT = 'appId=cid-v1:20ff76f4-0bca-495f-b7fd-09ca520e39f7'
120
121 1
    def __init__(self, signer: Signer):
122
        super().__init__(signer)
123
        self._ajax_id = random_ajax_id()
124
        self._request_verification_token = None
125
126 1
    def login(self):
127
        res = self.get(
128
            url=self.BASE_URL
129
        )
130
        assert res.status_code == 200
131
        try:
132
            action, data = get_form(res)
133
        except IndexError as ex:
134
            raise ValueError("Login form not found, please try again") from ex
135
136
        if action.startswith('https://cfdiau.sat.gob.mx/'):
137
            assert 'nidp/wsfed/ep?id=SATUPCFDiCon' in action
138
139
            res = self.fiel_login(
140
                login_response=self.form(
141
                    action.replace('nidp/wsfed/ep?id=SATUPCFDiCon', 'nidp/app/login?id=SATx509Custom'),
142
                    res.request.url,
143
                    data
144
                )
145
            )
146
147
            action, data = get_form(res)
148
            res = self.form(action, res.request.url, data)
149
150
            action, data = get_form(res)
151
152
        res = self.form(action, res.request.url, data)
153
154
        self._request_verification_token = request_verification_token(res)
155
        self._ajax_id = random_ajax_id()
156
        return res
157
158 1
    def _reload_verification_token(self):
159
        res = self.get(
160
            url=f'{self.BASE_URL}/Factura/GeneraFactura',
161
            allow_redirects=False
162
        )
163
        if res.status_code == 200:
164
            self._request_verification_token = request_verification_token(res)
165
        else:
166
            raise ValueError('Please Login Again')
167
168 1
    def reactivate_session(self):
169
        res = self.post(
170
            url=f'{self.BASE_URL}/Home/ReActiveSession',
171
            headers={
172
                'Origin': self.BASE_URL,
173
                'Request-Context': self.REQUEST_CONTEXT,
174
                'Request-Id': f'|{self._ajax_id}.{random_ajax_id()}'
175
            },
176
            allow_redirects=False
177
        )
178
        return res
179
180 1
    def _request(self, method, path, data=None, params=None):
181
        if self._request_verification_token is None:
182
            self._reload_verification_token()
183
184
        res = self.request(
185
            method=method,
186
            url=f'{self.BASE_URL}/{path}',
187
            headers={
188
                'Origin': self.BASE_URL,
189
                'Authority': self.BASE_URL,
190
                'Request-Context': self.REQUEST_CONTEXT,
191
                '__RequestVerificationToken': self._request_verification_token,
192
                'Request-Id': f'|{self._ajax_id}.{random_ajax_id()}'  # |pR4Px.o0yAS
193
            },
194
            data=data,
195
            params=params,
196
            allow_redirects=False
197
        )
198
        if res.status_code == 200:
199
            return res.json()
200
        else:
201
            raise ResponseError(res)
202
203 1
    def legal_name_valid(self, rfc, legal_name):
204
        res = self._request(
205
            method='POST',
206
            path='Clientes/ValidaRazonSocialRFC',
207
            data={
208
                'rfcValidar': rfc.upper(),
209
                'razonSocial': legal_name.upper(),
210
            })
211
        if not res['exitoso']:
212
            raise ResponseError(res)
213
        return res['resultado']
214
215 1
    def rfc_valid(self, rfc):
216
        res = self._request(
217
            method='POST',
218
            path='Clientes/ExisteLrfc',
219
            data={
220
                'rfcValidar': rfc.upper()
221
            }
222
        )
223
        if not res['exitoso']:
224
            raise ResponseError(res)
225
        return res['resultado']
226
227 1
    def lco_details(self, rfc, apply_border_region=True):
228
        res = self._request(
229
            method='GET',
230
            path='Clientes/ValidaLco',
231
            params={
232
                'rfcValidar': rfc.upper(),
233
                'aplicaRegionFronteriza': apply_border_region,
234
                "_": int(time() * 1000)
235
            }
236
        )
237
        return json.loads(res)
238
239
240 1
class SATPortalConstancia(PortalManager):
241 1
    BASE_URL = 'https://login.siat.sat.gob.mx'
242
243 1
    def login(self):
244
        res = self.get(
245
            url=f'{self.BASE_URL}/nidp/idff/sso?id=fiel_Aviso'
246
        )
247
        assert res.status_code == 200
248
        action, data = get_form(res)
249
        if action is None:
250
            return res
251
252
        return self.fiel_login(
253
            login_response=self.form(action, res.request.url, data)
254
        )
255
256 1
    def generar_constancia(self):
257
        self.login()
258
        res = self.get(
259
            url="https://rfcampc.siat.sat.gob.mx/app/seg/SessionBroker?url=/PTSC/IdcSiat/autc/ReimpresionTramite/ConsultaTramite.jsf&parametro=c&idSessionBit=&idSessionBit=null",
260
            allow_redirects=True
261
        )
262
        assert res.status_code == 200
263
264
        # Execute Authentication Request
265
        action, data = get_form(res)
266
        if action == "https://login.siat.sat.gob.mx/nidp/saml2/sso":
267
            res = self.form(action, res.request.url, data)
268
            assert res.status_code == 200
269
270
            # Execute Authentication Response
271
            action, data = get_form(res)
272
            assert action == "https://rfcampc.siat.sat.gob.mx/saml2/sp/acs/post"
273
            res = self.form(action, res.request.url, data)
274
            assert res.status_code == 200
275
276
        # Execute formReimpAcuse
277
        action, data = get_form(res)
278
        if action == "https://rfcampc.siat.sat.gob.mx/PTSC/IdcSiat/autc/ReimpresionTramite/ConsultaTramite.jsf":
279
            data = {
280
                'javax.faces.partial.ajax': "true",
281
                'javax.faces.source': 'formReimpAcuse:j_idt50',
282
                'javax.faces.partial.execute': '@all',
283
                'formReimpAcuse:j_idt50': 'formReimpAcuse:j_idt50',
284
                'formReimpAcuse': 'formReimpAcuse',
285
                'formReimpAcuse:tipoTramite_input': '0',
286
                'formReimpAcuse:tipoTramite_focus': '',
287
                'formReimpAcuse:fechaInicio_input:': '',
288
                'formReimpAcuse:fechaFin_input': '',
289
                'formReimpAcuse:folio': '',
290
                'javax.faces.ViewState': data['javax.faces.ViewState']
291
            }
292
            res = self.form(action, res.request.url, data)
293
            assert res.status_code == 200
294
295
        res = self.get(
296
            url='https://rfcampc.siat.sat.gob.mx/PTSC/IdcSiat/IdcGeneraConstancia.jsf'
297
        )
298
        assert res.status_code == 200
299
        return res.content
300
301
302 1
class SATPortalOpinionCumplimiento(PortalManager):
303 1
    BASE_URL = 'https://login.mat.sat.gob.mx'
304
305 1
    def login(self):
306
        res = self.get(
307
            url=f'{self.BASE_URL}/nidp/app/login?id=contr-dual-eFirma-totp'
308
        )
309
        assert res.status_code == 200
310
        action, data = get_form(res)
311
312
        res = self.form(action, res.request.url, data)
313
        assert res.status_code == 200
314
315
        res = self.form(action, res.request.url, data)
316
        assert res.status_code == 200
317
318
        res = self.form(action, res.request.url, data)
319
        assert res.status_code == 200
320
321
        return res
322
323 1
    def generar_opinion_cumplimiento_login(self):
324
        res = self.get(
325
            url="https://ptsc32d.clouda.sat.gob.mx/?/reporteOpinion32DContribuyente",
326
            allow_redirects=True
327
        )
328
        assert res.status_code == 200
329
330
        # Execute Authentication Request
331
        action, data = get_form(res)
332
        if action.startswith("https://login.mat.sat.gob.mx/nidp//app/login"):
333
            res = self.form(action, res.request.url, data)
334
            assert res.status_code == 200
335
336
            # Execute Authentication Response
337
            action, data = get_form(res)
338
            res = self.form(action, res.request.url, data)
339
            assert res.status_code == 200
340