Passed
Push — main ( cf8f99...b508f6 )
by Sat CFDI
09:22 queued 04:02
created

satcfdi.portal.utils.verify_token()   A

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 2
dl 0
loc 18
ccs 10
cts 10
cp 1
crap 1
rs 9.8
c 0
b 0
f 0
1 1
import base64
2 1
import random
3 1
import string
4 1
from urllib.parse import urlparse, urlunparse
5
6 1
from bs4 import BeautifulSoup
7 1
from requests import Response
8 1
from requests.adapters import HTTPAdapter
9 1
from urllib3.util import create_urllib3_context
10
11 1
from ..models import Signer, Certificate
12
13
14
# Ciphers compatible with SAT Services
15 1
CIPHERS = (
16
    'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:ECDH+AESGCM:'
17
    'DH+AESGCM:ECDH+AES:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!eNULL:!MD5:!DSS'
18
    ':HIGH:!DH'
19
)
20
21
22 1
class SSLAdapter(HTTPAdapter):
23 1
    def init_poolmanager(self, *args, **kwargs):
24 1
        kwargs['ssl_context'] = create_urllib3_context(ciphers=CIPHERS)
25 1
        return super().init_poolmanager(*args, **kwargs)
26
27 1
    def proxy_manager_for(self, *args, **kwargs):
28
        kwargs['ssl_context'] = create_urllib3_context(ciphers=CIPHERS)
29
        return super().proxy_manager_for(*args, **kwargs)
30
31
32
33 1
def generate_token(signer: Signer, code: str) -> bytes:
34 1
    rfc = signer.rfc
35 1
    num_serie = signer.certificate_number
36
37 1
    co = code + "|" + rfc + "|" + num_serie
38
39 1
    signature = signer.sign_sha1(data=co.encode())
40
41 1
    return base64.b64encode(
42
        base64.b64encode(co.encode()) + b"#" + base64.b64encode(signature.encode())
43
    )
44
45
46 1
def verify_token(certificate: Certificate, token: bytes) -> str:
47 1
    decoded_token = base64.b64decode(token)
48 1
    code, signature = decoded_token.split(b"#", maxsplit=2)
49
50 1
    code = base64.b64decode(code)
51 1
    signature = base64.b64decode(signature)
52
53 1
    assert certificate.verify_sha1(
54
        data=code,
55
        signature=base64.b64decode(signature)
56
    )
57
58 1
    code, rfc, num_serie = code.decode().split("|", maxsplit=3)
59
60 1
    assert rfc == certificate.rfc
61 1
    assert num_serie == certificate.certificate_number
62
63 1
    return code
64
65
66 1
def action_url(action: str | None, url: str):
67 1
    if not action:
68 1
        return url
69
70 1
    if action.startswith('/'):
71 1
        parts = urlparse(url)
72 1
        return urlunparse((parts.scheme, parts.netloc, action, None, None, None))
73
74 1
    return action
75
76
77 1
def get_form(res: Response, id=None):
78
    html = BeautifulSoup(res.text, 'html.parser')
79
    if id:
80
        form = html.find(id=id)
81
    else:
82
        form = html.select('form')[0]
83
84
    data = {
85
        i.attrs['name']: i.attrs.get('value')
86
        for i in form.findChildren('input')
87
        if 'name' in i.attrs
88
    }
89
90
    assert form.attrs['method'].upper() == "POST"
91
    return action_url(form.attrs.get('action'), res.url), data
92
93
94 1
def request_ref_headers(url):
95
    parts = urlparse(url)
96
    return {
97
        'origin': urlunparse((parts.scheme, parts.netloc, '', '', '', '')),
98
        'referer': url
99
    }
100
101
102 1
def request_verification_token(res: Response):
103
    html = BeautifulSoup(res.text, 'html.parser')
104
    return html.find(name='input', attrs={'name': '__RequestVerificationToken'}).attrs['value']
105
106
107 1
def random_ajax_id():
108
    return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
109