Passed
Push — main ( e3a6d2...e30615 )
by Sat CFDI
01:54
created

satdigitalinvoice.utils.first_duplicate()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
import logging
2
import os
3
import random
4
import shutil
5
from datetime import datetime
6
from uuid import UUID
7
8
from satcfdi import Signer, DatePeriod
9
10
11
def parse_date_period(periodo):
12
    fmt, period = try_parsing_date(periodo)
13
    if fmt == '%Y':
14
        return DatePeriod(period.year)
15
    if fmt == '%Y-%m':
16
        return DatePeriod(period.year, period.month)
17
    if fmt == '%Y-%m-%d':
18
        return DatePeriod(period.year, period.month, period.day)
19
20
21
def try_parsing_date(text):
22
    for fmt in ('%Y', '%Y-%m', '%Y-%m-%d'):
23
        try:
24
            return fmt, datetime.strptime(text, fmt)
25
        except ValueError:
26
            pass
27
    raise ValueError('no valid date format found')
28
29
30
def parse_ym_date(periodo):
31
    return try_parsing_date(periodo)[1]
32
33
34
def to_uuid(s):
35
    try:
36
        return UUID(s)
37
    except ValueError:
38
        return None
39
40
41
def random_string():
42
    chars = "0123456789abcdefghijklmnopqrstuvwxzyABCDEFGHIJKLMNOPQRSTUVWXZY"
43
    return "".join(random.choice(chars) for _ in range(32))
44
45
46
def add_file_handler():
47
    fh = logging.FileHandler(
48
        f'.data/errors.log',
49
        mode='a',
50
        encoding='utf-8',
51
    )
52
    fh.setLevel(logging.ERROR)
53
    formatter = logging.Formatter('%(asctime)s - %(message)s')
54
    fh.setFormatter(formatter)
55
    logging.root.addHandler(fh)
56
57
58
def convert_ans1_date(ans1_date):
59
    return datetime.strptime(ans1_date.decode('utf-8'), '%Y%m%d%H%M%SZ')
60
61
62
def cert_info(signer: Signer):
63
    if signer:
64
        return {
65
            "NoCertificado": signer.certificate_number,
66
            "Tipo": str(signer.type),
67
68
            "organizationName": signer.certificate.get_subject().O,
69
            "x500UniqueIdentifier": signer.certificate.get_subject().x500UniqueIdentifier,
70
            "serialNumber": signer.certificate.get_subject().serialNumber,
71
            "organizationUnitName": signer.certificate.get_subject().OU,
72
            "emailAddress": signer.certificate.get_subject().emailAddress,
73
74
            "notAfter": convert_ans1_date(signer.certificate.get_notAfter()),
75
            "notBefore": convert_ans1_date(signer.certificate.get_notBefore()),
76
        }
77
78
79
def find_best_match(cases, emission_date):
80
    fk, fv = (None, None)
81
    for k, v in cases.items():
82
        k = parse_ym_date(k)
83
        if k <= emission_date:
84
            if fk is None or k > fk:
85
                fk, fv = k, v
86
    return fk, fv
87
88
89
def clear_directory(directory):
90
    shutil.rmtree(directory, ignore_errors=True)
91
    os.makedirs(directory, exist_ok=True)
92
93
94
# calculate the number of months between two dates
95
def months_between(d1, d2):
96
    return (d1.year - d2.year) * 12 + d1.month - d2.month
97
98
99
def load_certificate(data):
100
    if 'data' in data:
101
        return Signer.load_pkcs12(
102
            **data,
103
        )
104
    else:
105
        return Signer.load(
106
            **data,
107
        )
108
109
110
def first_duplicate(seq):
111
    seen = set()
112
    for x in seq:
113
        if x in seen:
114
            return x
115
        seen.add(x)
116
    return None