Passed
Push — main ( b86ad9...84a75d )
by Sat CFDI
01:47
created

satdigitalinvoice.gui_functions.exportar_facturas()   A

Complexity

Conditions 3

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 45
rs 9.1359
c 0
b 0
f 0
cc 3
nop 4
1
import logging
2
import os
3
from datetime import datetime, date
4
from decimal import Decimal
5
from decimal import InvalidOperation
6
from typing import Sequence
7
8
import xlsxwriter
9
from markdown2 import markdown
10
from satcfdi import DatePeriod
11
from satcfdi.accounting import filter_invoices_iter, filter_payments_iter, invoices_export, payments_export
12
from satcfdi.accounting.process import payments_groupby_receptor, payments_retentions_export
13
from satcfdi.create.cfd import cfdi40
14
from satcfdi.create.cfd.cfdi40 import Comprobante, PagoComprobante
15
from satcfdi.pacs import sat
16
from satcfdi.printer import Representable
17
# noinspection PyUnresolvedReferences
18
from satcfdi.transform.catalog import CATALOGS
19
from weasyprint import HTML, CSS
20
21
from . import SOURCE_DIRECTORY, ARCHIVOS_DIRECTORY, TEMP_DIRECTORY
22
from .environments import facturacion_environment
23
from .exceptions import ConsoleErrors
24
from .formatting_functions.common import fecha_mes
25
from .utils import add_month
26
27
logger = logging.getLogger(__name__)
28
logger.level = logging.INFO
29
30
sat_manager = sat.SAT()
31
32
PERIODICIDAD = {
33
    "Mensual": 1,
34
    "Bimestral": 2,
35
    "Trimestral": 3,
36
    "Cuatrimestral": 4,
37
    "Semestral": 6,
38
    "Anual": 12
39
}
40
CALENDAR_FECHA_FMT = '%Y-%m-%d'
41
42
43
def create_cfdi(receptor_cif, factura_details, emisor_cif):
44
    emisor = cfdi40.Emisor(
45
        rfc=emisor_cif['Rfc'],
46
        nombre=emisor_cif['RazonSocial'],
47
        regimen_fiscal=emisor_cif['RegimenFiscal']
48
    )
49
    emisor = emisor | factura_details.get('Emisor', {})
50
    invoice = cfdi40.Comprobante(
51
        emisor=emisor,
52
        lugar_expedicion=emisor_cif['CodigoPostal'],
53
        receptor=cfdi40.Receptor(
54
            rfc=factura_details['Receptor'],
55
            nombre=receptor_cif['RazonSocial'],
56
            uso_cfdi=factura_details['UsoCFDI'],
57
            domicilio_fiscal_receptor=receptor_cif['CodigoPostal'],
58
            regimen_fiscal_receptor=receptor_cif['RegimenFiscal']
59
        ),
60
        metodo_pago=factura_details['MetodoPago'],
61
        forma_pago=factura_details['FormaPago'],
62
        # serie=serie,
63
        # folio=folio,
64
        conceptos=factura_details["Conceptos"]
65
    )
66
    invoice = invoice.process()
67
    return invoice
68
69
70
def parse_periodo_mes_ajuste(periodo_mes_ajuste: str):
71
    parts = periodo_mes_ajuste.split(".")
72
    if len(parts) != 2:
73
        raise ValueError("Periodo Invalido")
74
75
    periodo, mes_ajuste = parts
76
    if not mes_ajuste.isnumeric():
77
        raise ValueError("Periodo Invalido")
78
79
    mes_ajuste = int(mes_ajuste)
80
    if not (12 >= int(mes_ajuste) >= 1):
81
        raise ValueError("Periodo Invalido")
82
83
    if periodo not in PERIODICIDAD:
84
        raise ValueError("Periodo Invalido")
85
86
    return PERIODICIDAD[periodo], mes_ajuste
87
88
89
def format_concepto_desc(concepto, periodo):
90
    concepto = concepto.copy()
91
    template = facturacion_environment.from_string(concepto["Descripcion"])
92
    concepto["Descripcion"] = template.render(
93
        periodo=periodo
94
    )
95
    return concepto
96
97
98
def period_desc(dp: DatePeriod):
99
    return fecha_mes(date(year=dp.year, month=dp.month, day=1))
100
101
102
def periodicidad_desc(dp: DatePeriod, periodo_mes_ajuste, offset):
103
    periodo_meses, mes_ajuste = parse_periodo_mes_ajuste(periodo_mes_ajuste)
104
105
    if (dp.month - mes_ajuste) % periodo_meses == 0:
106
        if offset:
107
            dp = add_month(dp, offset)
108
109
        periodo = period_desc(dp)
110
        if periodo_meses > 1:
111
            periodo += " AL " + period_desc(
112
                dp=add_month(dp, periodo_meses - 1)
113
            )
114
115
        return periodo
116
    return None
117
118
119
def generate_ingresos(clients, facturas, dp, emisor_rfc):
120
    emisor_cif = clients[emisor_rfc]
121
    errors = []
122
123
    def facturas_iter():
124
        for i, f in enumerate(facturas, start=1):
125
            try:
126
                def prepare_concepto(c):
127
                    periodo = periodicidad_desc(
128
                        dp,
129
                        c['_periodo_mes_ajuste'],
130
                        c.get('_desfase_mes')
131
                    )
132
                    if periodo and c['ValorUnitario'] is not None:
133
                        return format_concepto_desc(c, periodo=periodo)
134
135
                receptor_cif = clients.get(f['Receptor'])
136
                if not receptor_cif:
137
                    raise ValueError("client not found")
138
139
                if f["MetodoPago"] == "PPD" and f["FormaPago"] != "99":
140
                    raise ValueError(f"FormaPago '{f['FormaPago']}' is invalid, expected '99' for PPD")
141
142
                if conceptos := [x for x in (prepare_concepto(c) for c in f["Conceptos"]) if x]:
143
                    f["Conceptos"] = conceptos
144
                    cfdi = create_cfdi(receptor_cif, f, emisor_cif)
145
146
                    expected_total = f.get('Total')
147
                    if expected_total is not None and expected_total != cfdi['Total']:
148
                        raise ValueError(f"Total '{expected_total}' is invalid, expected '{cfdi['Total']}'")
149
150
                    yield cfdi
151
            except Exception as e:
152
                errors.append(f"{i} {f['Receptor']}: {str(e)}")
153
154
    cfdis = list(facturas_iter())
155
    if errors:
156
        raise ConsoleErrors(errors=errors)
157
    return cfdis
158
159
160
def parse_fecha_pago(fecha_pago):
161
    if not fecha_pago:
162
        raise ValueError("Fecha de Pago es requerida")
163
164
    fecha_pago = datetime.strptime(fecha_pago, CALENDAR_FECHA_FMT)
165
    if fecha_pago > datetime.now():
166
        raise ValueError("Fecha de Pago es mayor a la fecha actual")
167
168
    if fecha_pago.replace(hour=12) > datetime.now():
169
        fecha_pago = datetime.now()
170
    else:
171
        fecha_pago = fecha_pago.replace(hour=12)
172
173
    dif = datetime.now() - fecha_pago
174
    if dif.days > 35:
175
        raise ValueError("Fecha de Pago es de hace mas de 35 dias")
176
177
    return fecha_pago
178
179
180
def parse_importe_pago(importe_pago: str):
181
    try:
182
        return round(Decimal(importe_pago), 2)
183
    except InvalidOperation:
184
        raise ValueError("Importe de Pago es invalido")
185
186
187
def pago_factura(factura_pagar, fecha_pago: datetime, forma_pago: str, importe_pago: Decimal = None):
188
    c = factura_pagar
189
    invoice = Comprobante.pago_comprobantes(
190
        comprobantes=[
191
            PagoComprobante(
192
                comprobante=c,
193
                num_parcialidad=c.ultima_num_parcialidad + 1,
194
                imp_saldo_ant=c.saldo_pendiente,
195
                imp_pagado=importe_pago
196
            )
197
        ],
198
        fecha_pago=fecha_pago,
199
        forma_pago=forma_pago,
200
    )
201
    return invoice.process()
202
203
204
def find_ajustes(facturas, mes_ajuste):
205
    for f in facturas:
206
        rfc = f["Receptor"]
207
        for concepto in f["Conceptos"]:
208
            _, mes_aj = parse_periodo_mes_ajuste(concepto['_periodo_mes_ajuste'])
209
            if mes_aj == mes_ajuste:
210
                yield rfc, concepto
211
212
213
def archivos_folder(dp: DatePeriod):
214
    if dp.month:
215
        return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year), str(dp.year) + "-{:02d}".format(dp.month))
216
    return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year))
217
218
219
def archivos_filename(dp: DatePeriod, ext="xlsx"):
220
    return os.path.join(archivos_folder(dp), f"{dp}.{ext}")
221
222
223
def exportar_facturas(all_invoices, dp: DatePeriod, emisor_cif, rfc_prediales):
224
    emisor_rfc = emisor_cif['Rfc']
225
    emisor_regimen = emisor_cif['RegimenFiscal']
226
227
    emitidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_emisor=emisor_rfc)
228
    emitidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_emisor=emisor_rfc)
229
    emitidas_pagos = list(emitidas_pagos)
230
231
    recibidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_receptor=emisor_rfc)
232
    recibidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_receptor=emisor_rfc)
233
234
    recibidas_pagos = list(recibidas_pagos)
235
    pagos_hechos_iva = [
236
        p
237
        for p in recibidas_pagos
238
        if sum(x.get("Importe", 0) for x in p.impuestos.get("Traslados", {}).values()) > 0
239
           and p.comprobante["Receptor"].get("RegimenFiscalReceptor") in (emisor_regimen, None)
240
    ]
241
    prediales = [p for p in recibidas_pagos if p.comprobante["Emisor"]["Rfc"] in rfc_prediales]
242
243
    archivo_excel = archivos_filename(dp)
244
    os.makedirs(os.path.dirname(archivo_excel), exist_ok=True)
245
246
    workbook = xlsxwriter.Workbook(archivo_excel)
247
    # EMITIDAS
248
    invoices_export(workbook, "EMITIDAS", emitidas)
249
    payments_export(workbook, "EMITIDAS PAGOS", emitidas_pagos)
250
251
    # RECIBIDAS
252
    invoices_export(workbook, "RECIBIDAS", recibidas)
253
    payments_export(workbook, "RECIBIDAS PAGOS", recibidas_pagos)
254
255
    # SPECIALES
256
    payments_export(workbook, f"RECIBIDAS PAGOS IVA {emisor_regimen.code}", pagos_hechos_iva)
257
    if prediales:
258
        payments_export(workbook, "PREDIALES", prediales)
259
260
    # RETENCIONES
261
    if dp.month is None:
262
        archivo_retenciones = archivos_filename(dp, ext="retenciones.txt")
263
        pagos_agrupados = payments_groupby_receptor(emitidas_pagos)
264
        payments_retentions_export(archivo_retenciones, pagos_agrupados)
265
266
    workbook.close()
267
    return archivo_excel
268
269
270
def generate_pdf_template(template_name, fields):
271
    increment_template = facturacion_environment.get_template(template_name)
272
    md5_document = increment_template.render(
273
        fields
274
    )
275
    html = markdown(md5_document)
276
    pdf = HTML(string=html).write_pdf(
277
        target=None,
278
        stylesheets=[
279
            os.path.join(SOURCE_DIRECTORY, "markdown_styles", "markdown6.css"),
280
            CSS(
281
                string='@page { width: Letter; margin: 1.6cm 1.6cm 1.6cm 1.6cm; }'
282
            )
283
        ]
284
    )
285
    return pdf
286
287
288
def mf_pago_fmt(cfdi):
289
    i = cfdi
290
    if i['TipoDeComprobante'] == "I":
291
        return i['TipoDeComprobante'].code + ' ' + i['MetodoPago'].code + ' ' + (i['FormaPago'].code if i['FormaPago'].code != '99' else '  ')
292
    return i['TipoDeComprobante'].code + '       '
293
294
295
def ajustes_directory(dp: DatePeriod):
296
    return os.path.join(archivos_folder(dp), 'ajustes')
297
298
299
def preview_cfdis(cfdis):
300
    outfile = os.path.join(TEMP_DIRECTORY, "factura.html")
301
    Representable.html_write_all(
302
        objs=cfdis,
303
        target=outfile,
304
    )
305
    os.startfile(
306
        os.path.abspath(outfile)
307
    )
308
309
310
def center_location(element) -> tuple[int, int]:
311
    return tuple((c + s // 3) for c, s in zip(element.current_location(), element.size))
312