Passed
Push — main ( c4be35...b86ad9 )
by Sat CFDI
01:42
created

satdigitalinvoice.gui_functions   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 49
eloc 222
dl 0
loc 313
rs 8.48
c 0
b 0
f 0

18 Functions

Rating   Name   Duplication   Size   Complexity  
A exportar_facturas() 0 45 3
A ajustes_directory() 0 2 1
A archivos_filename() 0 2 1
D generate_ingresos() 0 40 12
A format_concepto_desc() 0 7 1
A parse_periodo_mes_ajuste() 0 17 5
A preview_cfdis() 0 8 1
A mf_pago_fmt() 0 5 3
A periodicidad_desc() 0 15 4
A parse_fecha_pago() 0 18 5
A generate_pdf_template() 0 16 1
A period_desc() 0 2 1
A create_cfdi() 0 25 1
A archivos_folder() 0 4 2
A pago_factura() 0 15 1
A find_ajustes() 0 7 4
A parse_importe_pago() 0 5 2
A center_location() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like satdigitalinvoice.gui_functions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import os
3
from datetime import datetime, date
4
from decimal import Decimal
5
from decimal import InvalidOperation
6
from typing import Sequence
7
8
import xlsxwriter
9
from markdown2 import markdown
10
from satcfdi import DatePeriod
11
from satcfdi.accounting import filter_invoices_iter, filter_payments_iter, invoices_export, payments_export
12
from satcfdi.accounting.process import payments_groupby_receptor, payments_retentions_export
13
from satcfdi.create.cfd import cfdi40
14
from satcfdi.create.cfd.cfdi40 import Comprobante, PagoComprobante
15
from satcfdi.pacs import sat
16
from satcfdi.printer import Representable
17
# noinspection PyUnresolvedReferences
18
from satcfdi.transform.catalog import CATALOGS
19
from weasyprint import HTML, CSS
20
21
from . import SOURCE_DIRECTORY, ARCHIVOS_DIRECTORY, TEMP_DIRECTORY
22
from .environments import facturacion_environment
23
from .exceptions import ConsoleErrors
24
from .formatting_functions.common import fecha_mes
25
from .utils import add_month
26
27
logger = logging.getLogger(__name__)
28
logger.level = logging.INFO
29
30
sat_manager = sat.SAT()
31
32
PERIODICIDAD = {
33
    "Mensual": 1,
34
    "Bimestral": 2,
35
    "Trimestral": 3,
36
    "Cuatrimestral": 4,
37
    "Semestral": 6,
38
    "Anual": 12
39
}
40
CALENDAR_FECHA_FMT = '%Y-%m-%d'
41
42
43
def create_cfdi(receptor_cif, factura_details, emisor_cif):
44
    emisor = cfdi40.Emisor(
45
        rfc=emisor_cif['Rfc'],
46
        nombre=emisor_cif['RazonSocial'],
47
        regimen_fiscal=emisor_cif['RegimenFiscal']
48
    )
49
    emisor = emisor | factura_details.get('Emisor', {})
50
    invoice = cfdi40.Comprobante(
51
        emisor=emisor,
52
        lugar_expedicion=emisor_cif['CodigoPostal'],
53
        receptor=cfdi40.Receptor(
54
            rfc=factura_details['Receptor'],
55
            nombre=receptor_cif['RazonSocial'],
56
            uso_cfdi=factura_details['UsoCFDI'],
57
            domicilio_fiscal_receptor=receptor_cif['CodigoPostal'],
58
            regimen_fiscal_receptor=receptor_cif['RegimenFiscal']
59
        ),
60
        metodo_pago=factura_details['MetodoPago'],
61
        forma_pago=factura_details['FormaPago'],
62
        # serie=serie,
63
        # folio=folio,
64
        conceptos=factura_details["Conceptos"]
65
    )
66
    invoice = invoice.process()
67
    return invoice
68
69
70
def parse_periodo_mes_ajuste(periodo_mes_ajuste: str):
71
    parts = periodo_mes_ajuste.split(".")
72
    if len(parts) != 2:
73
        raise ValueError("Periodo Invalido")
74
75
    periodo, mes_ajuste = parts
76
    if not mes_ajuste.isnumeric():
77
        raise ValueError("Periodo Invalido")
78
79
    mes_ajuste = int(mes_ajuste)
80
    if not (12 >= int(mes_ajuste) >= 1):
81
        raise ValueError("Periodo Invalido")
82
83
    if periodo not in PERIODICIDAD:
84
        raise ValueError("Periodo Invalido")
85
86
    return PERIODICIDAD[periodo], mes_ajuste
87
88
89
def format_concepto_desc(concepto, periodo):
90
    concepto = concepto.copy()
91
    template = facturacion_environment.from_string(concepto["Descripcion"])
92
    concepto["Descripcion"] = template.render(
93
        periodo=periodo
94
    )
95
    return concepto
96
97
98
def period_desc(dp: DatePeriod):
99
    return fecha_mes(date(year=dp.year, month=dp.month, day=1))
100
101
102
def periodicidad_desc(dp: DatePeriod, periodo_mes_ajuste, offset):
103
    periodo_meses, mes_ajuste = parse_periodo_mes_ajuste(periodo_mes_ajuste)
104
105
    if (dp.month - mes_ajuste) % periodo_meses == 0:
106
        if offset:
107
            dp = add_month(dp, offset)
108
109
        periodo = period_desc(dp)
110
        if periodo_meses > 1:
111
            periodo += " AL " + period_desc(
112
                dp=add_month(dp, periodo_meses - 1)
113
            )
114
115
        return periodo
116
    return None
117
118
119
def generate_ingresos(clients, facturas, dp, emisor_rfc):
120
    emisor_cif = clients[emisor_rfc]
121
    errors = []
122
123
    def prepare_concepto(c):
124
        try:
125
            periodo = periodicidad_desc(
126
                dp,
127
                c['_periodo_mes_ajuste'],
128
                c.get('_desfase_mes')
129
            )
130
            if periodo and c['ValorUnitario'] is not None:
131
                return format_concepto_desc(c, periodo=periodo)
132
        except ValueError as ex:
133
            errors.append(str(ex))
134
135
    def facturas_iter():
136
        for f in facturas:
137
            receptor_cif = clients.get(f['Receptor'])
138
            if not receptor_cif:
139
                errors.append(f"{f['Receptor']}: client not found")
140
                continue
141
142
            if f["MetodoPago"] == "PPD" and f["FormaPago"] != "99":
143
                errors.append(f"{f['Receptor']}: FormaPago '{f['FormaPago']}' is invalid, expected '99' for PPD")
144
                continue
145
146
            conceptos = [x for x in (prepare_concepto(c) for c in f["Conceptos"]) if x]
147
            if conceptos:
148
                f["Conceptos"] = conceptos
149
                cfdi = create_cfdi(receptor_cif, f, emisor_cif)
150
                expected_total = f.get('Total')
151
                if expected_total is not None and expected_total != cfdi['Total']:
152
                    errors.append(f"{f['Receptor']}: Total '{expected_total}' is invalid, expected '{cfdi['Total']}'")
153
154
                yield cfdi
155
156
    if errors:
157
        raise ConsoleErrors("Some errors were found", errors=errors)
158
    return list(facturas_iter())
159
160
161
def parse_fecha_pago(fecha_pago):
162
    if not fecha_pago:
163
        raise ValueError("Fecha de Pago es requerida")
164
165
    fecha_pago = datetime.strptime(fecha_pago, CALENDAR_FECHA_FMT)
166
    if fecha_pago > datetime.now():
167
        raise ValueError("Fecha de Pago es mayor a la fecha actual")
168
169
    if fecha_pago.replace(hour=12) > datetime.now():
170
        fecha_pago = datetime.now()
171
    else:
172
        fecha_pago = fecha_pago.replace(hour=12)
173
174
    dif = datetime.now() - fecha_pago
175
    if dif.days > 35:
176
        raise ValueError("Fecha de Pago es de hace mas de 35 dias")
177
178
    return fecha_pago
179
180
181
def parse_importe_pago(importe_pago: str):
182
    try:
183
        return round(Decimal(importe_pago), 2)
184
    except InvalidOperation:
185
        raise ValueError("Importe de Pago es invalido")
186
187
188
def pago_factura(factura_pagar, fecha_pago: datetime, forma_pago: str, importe_pago: Decimal = None):
189
    c = factura_pagar
190
    invoice = Comprobante.pago_comprobantes(
191
        comprobantes=[
192
            PagoComprobante(
193
                comprobante=c,
194
                num_parcialidad=c.ultima_num_parcialidad + 1,
195
                imp_saldo_ant=c.saldo_pendiente,
196
                imp_pagado=importe_pago
197
            )
198
        ],
199
        fecha_pago=fecha_pago,
200
        forma_pago=forma_pago,
201
    )
202
    return invoice.process()
203
204
205
def find_ajustes(facturas, mes_ajuste):
206
    for f in facturas:
207
        rfc = f["Receptor"]
208
        for concepto in f["Conceptos"]:
209
            _, mes_aj = parse_periodo_mes_ajuste(concepto['_periodo_mes_ajuste'])
210
            if mes_aj == mes_ajuste:
211
                yield rfc, concepto
212
213
214
def archivos_folder(dp: DatePeriod):
215
    if dp.month:
216
        return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year), str(dp.year) + "-{:02d}".format(dp.month))
217
    return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year))
218
219
220
def archivos_filename(dp: DatePeriod, ext="xlsx"):
221
    return os.path.join(archivos_folder(dp), f"{dp}.{ext}")
222
223
224
def exportar_facturas(all_invoices, dp: DatePeriod, emisor_cif, rfc_prediales):
225
    emisor_rfc = emisor_cif['Rfc']
226
    emisor_regimen = emisor_cif['RegimenFiscal']
227
228
    emitidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_emisor=emisor_rfc)
229
    emitidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_emisor=emisor_rfc)
230
    emitidas_pagos = list(emitidas_pagos)
231
232
    recibidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_receptor=emisor_rfc)
233
    recibidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_receptor=emisor_rfc)
234
235
    recibidas_pagos = list(recibidas_pagos)
236
    pagos_hechos_iva = [
237
        p
238
        for p in recibidas_pagos
239
        if sum(x.get("Importe", 0) for x in p.impuestos.get("Traslados", {}).values()) > 0
240
           and p.comprobante["Receptor"].get("RegimenFiscalReceptor") in (emisor_regimen, None)
241
    ]
242
    prediales = [p for p in recibidas_pagos if p.comprobante["Emisor"]["Rfc"] in rfc_prediales]
243
244
    archivo_excel = archivos_filename(dp)
245
    os.makedirs(os.path.dirname(archivo_excel), exist_ok=True)
246
247
    workbook = xlsxwriter.Workbook(archivo_excel)
248
    # EMITIDAS
249
    invoices_export(workbook, "EMITIDAS", emitidas)
250
    payments_export(workbook, "EMITIDAS PAGOS", emitidas_pagos)
251
252
    # RECIBIDAS
253
    invoices_export(workbook, "RECIBIDAS", recibidas)
254
    payments_export(workbook, "RECIBIDAS PAGOS", recibidas_pagos)
255
256
    # SPECIALES
257
    payments_export(workbook, f"RECIBIDAS PAGOS IVA {emisor_regimen.code}", pagos_hechos_iva)
258
    if prediales:
259
        payments_export(workbook, "PREDIALES", prediales)
260
261
    # RETENCIONES
262
    if dp.month is None:
263
        archivo_retenciones = archivos_filename(dp, ext="retenciones.txt")
264
        pagos_agrupados = payments_groupby_receptor(emitidas_pagos)
265
        payments_retentions_export(archivo_retenciones, pagos_agrupados)
266
267
    workbook.close()
268
    return archivo_excel
269
270
271
def generate_pdf_template(template_name, fields):
272
    increment_template = facturacion_environment.get_template(template_name)
273
    md5_document = increment_template.render(
274
        fields
275
    )
276
    html = markdown(md5_document)
277
    pdf = HTML(string=html).write_pdf(
278
        target=None,
279
        stylesheets=[
280
            os.path.join(SOURCE_DIRECTORY, "markdown_styles", "markdown6.css"),
281
            CSS(
282
                string='@page { width: Letter; margin: 1.6cm 1.6cm 1.6cm 1.6cm; }'
283
            )
284
        ]
285
    )
286
    return pdf
287
288
289
def mf_pago_fmt(cfdi):
290
    i = cfdi
291
    if i['TipoDeComprobante'] == "I":
292
        return i['TipoDeComprobante'].code + ' ' + i['MetodoPago'].code + ' ' + (i['FormaPago'].code if i['FormaPago'].code != '99' else '  ')
293
    return i['TipoDeComprobante'].code + '       '
294
295
296
def ajustes_directory(dp: DatePeriod):
297
    return os.path.join(archivos_folder(dp), 'ajustes')
298
299
300
def preview_cfdis(cfdis):
301
    outfile = os.path.join(TEMP_DIRECTORY, "factura.html")
302
    Representable.html_write_all(
303
        objs=cfdis,
304
        target=outfile,
305
    )
306
    os.startfile(
307
        os.path.abspath(outfile)
308
    )
309
310
311
def center_location(element) -> tuple[int, int]:
312
    return tuple((c + s // 3) for c, s in zip(element.current_location(), element.size))
313