Passed
Push — main ( 831997...bdbfd5 )
by Sat CFDI
01:47
created

satdigitalinvoice.gui_functions.period_desc()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
import logging
2
import os
3
from datetime import datetime, date
4
from decimal import Decimal
5
from decimal import InvalidOperation
6
7
import xlsxwriter
8
from babel.dates import format_date
9
from markdown2 import markdown
10
from satcfdi import DatePeriod
11
from satcfdi.accounting import filter_invoices_iter, filter_payments_iter, invoices_export, payments_export
12
from satcfdi.accounting.process import payments_groupby_receptor, payments_retentions_export
13
from satcfdi.create.cfd import cfdi40
14
from satcfdi.create.cfd.cfdi40 import Comprobante, PagoComprobante
15
from satcfdi.pacs import sat
16
from satcfdi.printer import Representable
17
# noinspection PyUnresolvedReferences
18
from satcfdi.transform.catalog import CATALOGS
19
from weasyprint import HTML, CSS
20
from xlsxwriter.exceptions import FileCreateError
21
22
from . import SOURCE_DIRECTORY, ARCHIVOS_DIRECTORY, TEMP_DIRECTORY
23
from .environments import facturacion_environment
24
from .utils import add_month
25
26
logger = logging.getLogger(__name__)
27
logger.level = logging.INFO
28
29
sat_manager = sat.SAT()
30
31
PERIODICIDAD = {
32
    "Mensual": 1,
33
    "Bimestral": 2,
34
    "Trimestral": 3,
35
    "Cuatrimestral": 4,
36
    "Semestral": 6,
37
    "Anual": 12
38
}
39
40
41
def create_cfdi(receptor_cif, factura_details, emisor_cif):
42
    emisor = cfdi40.Emisor(
43
        rfc=emisor_cif['Rfc'],
44
        nombre=emisor_cif['RazonSocial'],
45
        regimen_fiscal=emisor_cif['RegimenFiscal']
46
    )
47
    emisor = emisor | factura_details.get('Emisor', {})
48
    invoice = cfdi40.Comprobante(
49
        emisor=emisor,
50
        lugar_expedicion=emisor_cif['CodigoPostal'],
51
        receptor=cfdi40.Receptor(
52
            rfc=factura_details['Receptor'],
53
            nombre=receptor_cif['RazonSocial'],
54
            uso_cfdi=factura_details['UsoCFDI'],
55
            domicilio_fiscal_receptor=receptor_cif['CodigoPostal'],
56
            regimen_fiscal_receptor=receptor_cif['RegimenFiscal']
57
        ),
58
        metodo_pago=factura_details['MetodoPago'],
59
        forma_pago=factura_details['FormaPago'],
60
        # serie=serie,
61
        # folio=folio,
62
        conceptos=factura_details["Conceptos"]
63
    )
64
    invoice = invoice.process()
65
66
    expected_total = factura_details.get('Total')
67
    if expected_total is not None:
68
        if expected_total != invoice['Total']:
69
            logger.info(f"{factura_details['Receptor']}: Total '{expected_total}' is invalid, expected '{invoice['Total']}'")
70
            return None
71
72
    return invoice
73
74
75
def parse_periodo_mes_ajuste(periodo_mes_ajuste: str):
76
    parts = periodo_mes_ajuste.split(".")
77
    if len(parts) != 2:
78
        raise ValueError("Periodo Invalido")
79
80
    periodo, mes_ajuste = parts
81
    if not mes_ajuste.isnumeric():
82
        raise ValueError("Periodo Invalido")
83
84
    mes_ajuste = int(mes_ajuste)
85
    if not (12 >= int(mes_ajuste) >= 1):
86
        raise ValueError("Periodo Invalido")
87
88
    if periodo not in PERIODICIDAD:
89
        raise ValueError("Periodo Invalido")
90
91
    return periodo, mes_ajuste
92
93
94
def format_concepto_desc(concepto, periodo):
95
    concepto = concepto.copy()
96
    template = facturacion_environment.from_string(concepto["Descripcion"])
97
    concepto["Descripcion"] = template.render(
98
        periodo=periodo
99
    )
100
    return concepto
101
102
103
def validad_facturas(clients, facturas):
104
    is_valid = True
105
    for factura_details in facturas:
106
        cliente = clients.get(factura_details['Receptor'])
107
        if not cliente:
108
            logger.info(f"{factura_details['Receptor']}: client not found")
109
            is_valid = False
110
111
        for c in factura_details["Conceptos"]:
112
            periodo_mes_ajuste = c.get("_periodo_mes_ajuste", "")
113
            try:
114
                parse_periodo_mes_ajuste(periodo_mes_ajuste)
115
            except ValueError:
116
                logger.info(f"{factura_details['Receptor']}: _periodo_mes_ajuste '{periodo_mes_ajuste}' is invalid")
117
                is_valid = False
118
119
        if factura_details["MetodoPago"] == "PPD" and factura_details["FormaPago"] != "99":
120
            logger.info(f"{factura_details['Receptor']}: FormaPago '{factura_details['FormaPago']}' is invalid, expected '99' for PPD")
121
            is_valid = False
122
123
    return is_valid
124
125
126
def period_desc(dp: DatePeriod):
127
    return format_date(date(year=dp.year, month=dp.month, day=1), locale='es_MX', format="'Mes de' MMMM 'del' y").upper()
128
129
130
def periodicidad_desc(dp: DatePeriod, periodo_mes_ajuste, offset):
131
    periodicidad, mes_ajuste = parse_periodo_mes_ajuste(periodo_mes_ajuste)
132
    periodo_meses = PERIODICIDAD[periodicidad]
133
134
    if (dp.month - mes_ajuste) % periodo_meses == 0:
135
        if offset:
136
            dp = add_month(dp, offset)
137
138
        periodo = period_desc(dp)
139
        if periodo_meses > 1:
140
            periodo += " AL " + period_desc(
141
                dp=add_month(dp, periodo_meses - 1)
142
            )
143
144
        return periodo
145
    return None
146
147
148
def generate_ingresos(clients, facturas, dp, emisor_rfc):
149
    if not validad_facturas(clients, facturas):
150
        return
151
152
    emisor_cif = clients[emisor_rfc]
153
154
    def prepare_concepto(concepto):
155
        periodo = periodicidad_desc(
156
            dp,
157
            concepto['_periodo_mes_ajuste'],
158
            concepto.get('_desfase_mes')
159
        )
160
        if periodo and concepto['ValorUnitario'] is not None:
161
            return format_concepto_desc(concepto, periodo=periodo)
162
163
    def facturas_iter():
164
        i = 0
165
        for f in facturas:
166
            receptor_cif = clients[f['Receptor']]
167
            conceptos = [x for x in (prepare_concepto(c) for c in f["Conceptos"]) if x]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable c does not seem to be defined.
Loading history...
168
            if conceptos:
169
                f["Conceptos"] = conceptos
170
                yield create_cfdi(receptor_cif, f, emisor_cif)
171
                i += 1
172
173
    cfdis = [i for i in facturas_iter()]
174
175
    if None in cfdis:
176
        return
177
178
    return cfdis
179
180
181
def parse_fecha_pago(fecha_pago):
182
    if not fecha_pago:
183
        raise ValueError("Fecha de Pago es requerida")
184
185
    fecha_pago = datetime.fromisoformat(fecha_pago)
186
    if fecha_pago > datetime.now():
187
        raise ValueError("Fecha de Pago es mayor a la fecha actual")
188
189
    if fecha_pago.replace(hour=12) > datetime.now():
190
        fecha_pago = datetime.now()
191
    else:
192
        fecha_pago = fecha_pago.replace(hour=12)
193
194
    dif = datetime.now() - fecha_pago
195
    if dif.days > 30:
196
        raise ValueError("Fecha de Pago es de hace mas de 30 dias")
197
198
    return fecha_pago
199
200
201
def parse_importe_pago(importe_pago: str):
202
    if not importe_pago:
203
        return None
204
205
    try:
206
        return round(Decimal(importe_pago), 2)
207
    except InvalidOperation:
208
        raise ValueError("Importe de Pago es invalido")
209
210
211
def pago_factura(factura_pagar, fecha_pago: datetime, forma_pago: str, importe_pago: Decimal = None):
212
    c = factura_pagar
213
    invoice = Comprobante.pago_comprobantes(
214
        comprobantes=[
215
            PagoComprobante(
216
                comprobante=c,
217
                num_parcialidad=c.ultima_num_parcialidad + 1,
218
                imp_saldo_ant=c.saldo_pendiente,
219
                imp_pagado=importe_pago
220
            )
221
        ],
222
        fecha_pago=fecha_pago,
223
        forma_pago=forma_pago,
224
    )
225
    return invoice.process()
226
227
228
def find_ajustes(facturas, mes_ajuste):
229
    for f in facturas:
230
        rfc = f["Receptor"]
231
        for concepto in f["Conceptos"]:
232
            _, mes_aj = parse_periodo_mes_ajuste(concepto['_periodo_mes_ajuste'])
233
            if mes_aj == mes_ajuste:
234
                yield rfc, concepto
235
236
237
def archivos_folder(dp: DatePeriod):
238
    if dp.month:
239
        return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year), str(dp.year) + "-{:02d}".format(dp.month))
240
    return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year))
241
242
243
def archivos_filename(dp: DatePeriod, ext="xlsx"):
244
    return os.path.join(archivos_folder(dp), f"{dp}.{ext}")
245
246
247
def exportar_facturas(all_invoices, dp: DatePeriod, emisor_cif, rfc_prediales):
248
    emisor_rfc = emisor_cif['Rfc']
249
    emisor_regimen = emisor_cif['RegimenFiscal']
250
251
    emitidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_emisor=emisor_rfc)
252
    emitidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_emisor=emisor_rfc)
253
    emitidas_pagos = list(emitidas_pagos)
254
255
    recibidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_receptor=emisor_rfc)
256
    recibidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_receptor=emisor_rfc)
257
258
    recibidas_pagos = list(recibidas_pagos)
259
    pagos_hechos_iva = [
260
        p
261
        for p in recibidas_pagos
262
        if sum(x.get("Importe", 0) for x in p.impuestos.get("Traslados", {}).values()) > 0
263
           and p.comprobante["Receptor"].get("RegimenFiscalReceptor") in (emisor_regimen, None)
264
    ]
265
    prediales = [p for p in recibidas_pagos if p.comprobante["Emisor"]["Rfc"] in rfc_prediales]
266
267
    archivo_excel = archivos_filename(dp)
268
    os.makedirs(os.path.dirname(archivo_excel), exist_ok=True)
269
270
    workbook = xlsxwriter.Workbook(archivo_excel)
271
    # EMITIDAS
272
    invoices_export(workbook, "EMITIDAS", emitidas)
273
    payments_export(workbook, "EMITIDAS PAGOS", emitidas_pagos)
274
275
    # RECIBIDAS
276
    invoices_export(workbook, "RECIBIDAS", recibidas)
277
    payments_export(workbook, "RECIBIDAS PAGOS", recibidas_pagos)
278
279
    # SPECIALES
280
    payments_export(workbook, f"RECIBIDAS PAGOS IVA {emisor_regimen.code}", pagos_hechos_iva)
281
    if prediales:
282
        payments_export(workbook, "PREDIALES", prediales)
283
284
    # RETENCIONES
285
    if dp.month is None:
286
        archivo_retenciones = archivos_filename(dp, ext="retenciones.txt")
287
        pagos_agrupados = payments_groupby_receptor(emitidas_pagos)
288
        payments_retentions_export(archivo_retenciones, pagos_agrupados)
289
290
    try:
291
        workbook.close()
292
        return archivo_excel
293
    except FileCreateError:
294
        return None
295
296
297
def generate_pdf_template(template_name, fields):
298
    increment_template = facturacion_environment.get_template(template_name)
299
    md5_document = increment_template.render(
300
        fields
301
    )
302
    html = markdown(md5_document)
303
    pdf = HTML(string=html).write_pdf(
304
        target=None,
305
        stylesheets=[
306
            os.path.join(SOURCE_DIRECTORY, "markdown_styles", "markdown6.css"),
307
            CSS(
308
                string='@page { width: Letter; margin: 1.6cm 1.6cm 1.6cm 1.6cm; }'
309
            )
310
        ]
311
    )
312
    return pdf
313
314
315
def mf_pago_fmt(cfdi):
316
    i = cfdi
317
    if i['TipoDeComprobante'] == "I":
318
        return i['TipoDeComprobante'].code + ' ' + i['MetodoPago'].code + ' ' + (i['FormaPago'].code if i['FormaPago'].code != '99' else '')
319
    return i['TipoDeComprobante'].code
320
321
322
def ajustes_directory(dp: DatePeriod):
323
    return os.path.join(archivos_folder(dp), 'ajustes')
324
325
326
def preview_cfdis(cfdis):
327
    outfile = os.path.join(TEMP_DIRECTORY, "factura.html")
328
    Representable.html_write_all(
329
        objs=cfdis,
330
        target=outfile,
331
    )
332
    os.startfile(
333
        os.path.abspath(outfile)
334
    )
335