Passed
Push — main ( fa2761...0dabbf )
by Sat CFDI
01:53
created

satdigitalinvoice.gui_functions.archivos_folder()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
import logging
2
import os
3
from datetime import datetime, date
4
from decimal import Decimal
5
from uuid import uuid4
6
7
import xlsxwriter
8
from babel.dates import format_date
9
from markdown2 import markdown
10
from satcfdi import DatePeriod
11
from satcfdi.accounting import filter_invoices_iter, filter_payments_iter, invoices_export, payments_export
12
from satcfdi.accounting.process import payments_groupby_receptor, payments_retentions_export
13
from satcfdi.create.cfd import cfdi40
14
from satcfdi.create.cfd.cfdi40 import Comprobante, PagoComprobante
15
from satcfdi.pacs import sat
16
# noinspection PyUnresolvedReferences
17
from satcfdi.transform.catalog import CATALOGS
18
from weasyprint import HTML, CSS
19
from xlsxwriter.exceptions import FileCreateError
20
21
from . import SOURCE_DIRECTORY, ARCHIVOS_DIRECTORY, TEMP_DIRECTORY
22
from .environments import facturacion_environment
23
from .utils import add_month
24
25
logger = logging.getLogger(__name__)
26
logger.level = logging.INFO
27
28
sat_manager = sat.SAT()
29
30
PERIODOS = {
31
    "Mensual": 1,
32
    "Bimestral": 2,
33
    "Trimestral": 3,
34
    "Cuatrimestral": 4,
35
    "Semestral": 6,
36
    "Anual": 12
37
}
38
39
40
def create_cfdi(receptor_cif, factura_details, emisor_cif):
41
    emisor = cfdi40.Emisor(
42
        rfc=emisor_cif['Rfc'],
43
        nombre=emisor_cif['RazonSocial'],
44
        regimen_fiscal=emisor_cif['RegimenFiscal']
45
    )
46
    emisor = emisor | factura_details.get('Emisor', {})
47
    invoice = cfdi40.Comprobante(
48
        emisor=emisor,
49
        lugar_expedicion=emisor_cif['CodigoPostal'],
50
        receptor=cfdi40.Receptor(
51
            rfc=factura_details['Receptor'],
52
            nombre=receptor_cif['RazonSocial'],
53
            uso_cfdi=factura_details['UsoCFDI'],
54
            domicilio_fiscal_receptor=receptor_cif['CodigoPostal'],
55
            regimen_fiscal_receptor=receptor_cif['RegimenFiscal']
56
        ),
57
        metodo_pago=factura_details['MetodoPago'],
58
        forma_pago=factura_details['FormaPago'],
59
        # serie=serie,
60
        # folio=folio,
61
        conceptos=factura_details["Conceptos"]
62
    )
63
    invoice = invoice.process()
64
65
    expected_total = factura_details.get('Total')
66
    if expected_total is not None:
67
        if expected_total != invoice['Total']:
68
            logger.info(f"{factura_details['Receptor']}: Total '{expected_total}' is invalid, expected '{invoice['Total']}'")
69
            return None
70
71
    return invoice
72
73
74
def parse_periodo_mes_ajuste(periodo_mes_ajuste: str):
75
    parts = periodo_mes_ajuste.split(".")
76
    if len(parts) != 2:
77
        raise ValueError("Periodo Invalido")
78
79
    periodo, mes_ajuste = parts
80
    if not mes_ajuste.isnumeric():
81
        raise ValueError("Periodo Invalido")
82
83
    mes_ajuste = int(mes_ajuste)
84
    if not (12 >= int(mes_ajuste) >= 1):
85
        raise ValueError("Periodo Invalido")
86
87
    if periodo not in PERIODOS:
88
        raise ValueError("Periodo Invalido")
89
90
    return periodo, mes_ajuste
91
92
93
def format_concepto_desc(concepto, periodo):
94
    concepto = concepto.copy()
95
    template = facturacion_environment.from_string(concepto["Descripcion"])
96
    concepto["Descripcion"] = template.render(
97
        periodo=periodo
98
    )
99
    return concepto
100
101
102
def validad_facturas(clients, facturas):
103
    is_valid = True
104
    for factura_details in facturas:
105
        cliente = clients.get(factura_details['Receptor'])
106
        if not cliente:
107
            logger.info(f"{factura_details['Receptor']}: client not found")
108
            is_valid = False
109
110
        for c in factura_details["Conceptos"]:
111
            periodo_mes_ajuste = c.get("_periodo_mes_ajuste", "")
112
            try:
113
                parse_periodo_mes_ajuste(periodo_mes_ajuste)
114
            except ValueError:
115
                logger.info(f"{factura_details['Receptor']}: _periodo_mes_ajuste '{periodo_mes_ajuste}' is invalid")
116
                is_valid = False
117
118
        if factura_details["MetodoPago"] == "PPD" and factura_details["FormaPago"] != "99":
119
            logger.info(f"{factura_details['Receptor']}: FormaPago '{factura_details['FormaPago']}' is invalid, expected '99' for PPD")
120
            is_valid = False
121
122
    return is_valid
123
124
125
def year_month_desc(dp: DatePeriod):
126
    return format_date(date(year=dp.year, month=dp.month, day=1), locale='es_MX', format="'Mes de' MMMM 'del' y").upper()
127
128
129
def periodo_desc(dp: DatePeriod, periodo_mes_ajuste, offset):
130
    periodo, mes_ajuste = parse_periodo_mes_ajuste(periodo_mes_ajuste)
131
    periodo_meses = PERIODOS[periodo]
132
133
    if (dp.month - mes_ajuste) % periodo_meses == 0:
134
        if offset:
135
            dp = add_month(dp, offset)
136
137
        periodo = year_month_desc(dp)
138
        if periodo_meses > 1:
139
            periodo += " AL " + year_month_desc(
140
                dp=add_month(dp, periodo_meses - 1)
141
            )
142
143
        return periodo
144
    return None
145
146
147
def generate_ingresos(clients, facturas, dp, emisor_rfc):
148
    if not validad_facturas(clients, facturas):
149
        return
150
151
    emisor_cif = clients[emisor_rfc]
152
153
    def prepare_concepto(concepto):
154
        periodo = periodo_desc(
155
            dp,
156
            concepto['_periodo_mes_ajuste'],
157
            concepto.get('_desfase_mes')
158
        )
159
        if periodo and concepto['ValorUnitario'] is not None:
160
            return format_concepto_desc(concepto, periodo=periodo)
161
162
    def facturas_iter():
163
        i = 0
164
        for f in facturas:
165
            receptor_cif = clients[f['Receptor']]
166
            conceptos = [x for x in (prepare_concepto(c) for c in f["Conceptos"]) if x]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable c does not seem to be defined.
Loading history...
167
            if conceptos:
168
                f["Conceptos"] = conceptos
169
                yield create_cfdi(receptor_cif, f, emisor_cif)
170
                i += 1
171
172
    cfdis = [i for i in facturas_iter()]
173
174
    if None in cfdis:
175
        return
176
177
    return cfdis
178
179
180
def parse_fecha_pago(fecha_pago):
181
    if not fecha_pago:
182
        raise ValueError("Fecha de Pago es requerida")
183
184
    fecha_pago = datetime.fromisoformat(fecha_pago)
185
    if fecha_pago > datetime.now():
186
        raise ValueError("Fecha de Pago es mayor a la fecha actual")
187
188
    if fecha_pago.replace(hour=12) > datetime.now():
189
        fecha_pago = datetime.now()
190
    else:
191
        fecha_pago = fecha_pago.replace(hour=12)
192
193
    dif = datetime.now() - fecha_pago
194
    if dif.days > 30:
195
        raise ValueError("Fecha de Pago es de hace mas de 30 dias")
196
197
    return fecha_pago
198
199
200
def pago_factura(factura_pagar, fecha_pago: datetime, importe_pago, forma_pago):
201
    c = factura_pagar
202
203
    if importe_pago:
204
        importe_pago = round(Decimal(importe_pago), 2)
205
    else:
206
        importe_pago = c.saldo_pendiente
207
208
    invoice = Comprobante.pago_comprobantes(
209
        comprobantes=[
210
            PagoComprobante(
211
                comprobante=c,
212
                num_parcialidad=c.ultima_num_parcialidad + 1,
213
                imp_saldo_ant=c.saldo_pendiente,
214
                imp_pagado=importe_pago
215
            )
216
        ],
217
        fecha_pago=fecha_pago,
218
        forma_pago=forma_pago,
219
    )
220
    return invoice.process()
221
222
223
def find_ajustes(facturas, mes_ajuste):
224
    for f in facturas:
225
        rfc = f["Receptor"]
226
        for concepto in f["Conceptos"]:
227
            _, mes_aj = parse_periodo_mes_ajuste(concepto['_periodo_mes_ajuste'])
228
            if mes_aj == mes_ajuste:
229
                yield rfc, concepto
230
231
232
def archivos_folder(dp: DatePeriod):
233
    if dp.month:
234
        return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year), str(dp.year) + "-{:02d}".format(dp.month))
235
    return os.path.join(ARCHIVOS_DIRECTORY, str(dp.year))
236
237
238
def archivos_filename(dp: DatePeriod, ext="xlsx"):
239
    return os.path.join(archivos_folder(dp), f"{dp}.{ext}")
240
241
242
def exportar_facturas(all_invoices, dp: DatePeriod, emisor_cif, rfc_prediales):
243
    emisor_rfc = emisor_cif['Rfc']
244
    emisor_regimen = emisor_cif['RegimenFiscal']
245
246
    emitidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_emisor=emisor_rfc)
247
    emitidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_emisor=emisor_rfc)
248
    emitidas_pagos = list(emitidas_pagos)
249
250
    recibidas = filter_invoices_iter(invoices=all_invoices.values(), fecha=dp, rfc_receptor=emisor_rfc)
251
    recibidas_pagos = filter_payments_iter(invoices=all_invoices, fecha=dp, rfc_receptor=emisor_rfc)
252
253
    recibidas_pagos = list(recibidas_pagos)
254
    pagos_hechos_iva = [
255
        p
256
        for p in recibidas_pagos
257
        if sum(x.get("Importe", 0) for x in p.impuestos.get("Traslados", {}).values()) > 0
258
           and p.comprobante["Receptor"].get("RegimenFiscalReceptor") in (emisor_regimen, None)
259
    ]
260
    prediales = [p for p in recibidas_pagos if p.comprobante["Emisor"]["Rfc"] in rfc_prediales]
261
262
    archivo_excel = archivos_filename(dp)
263
    os.makedirs(os.path.dirname(archivo_excel), exist_ok=True)
264
265
    workbook = xlsxwriter.Workbook(archivo_excel)
266
    # EMITIDAS
267
    invoices_export(workbook, "EMITIDAS", emitidas)
268
    payments_export(workbook, "EMITIDAS PAGOS", emitidas_pagos)
269
270
    # RECIBIDAS
271
    invoices_export(workbook, "RECIBIDAS", recibidas)
272
    payments_export(workbook, "RECIBIDAS PAGOS", recibidas_pagos)
273
274
    # SPECIALES
275
    payments_export(workbook, f"RECIBIDAS PAGOS IVA {emisor_regimen.code}", pagos_hechos_iva)
276
    if prediales:
277
        payments_export(workbook, "PREDIALES", prediales)
278
279
    # RETENCIONES
280
    if dp.month is None:
281
        archivo_retenciones = archivos_filename(dp, ext="retenciones.txt")
282
        pagos_agrupados = payments_groupby_receptor(emitidas_pagos)
283
        payments_retentions_export(archivo_retenciones, pagos_agrupados)
284
285
    try:
286
        workbook.close()
287
        return archivo_excel
288
    except FileCreateError:
289
        return None
290
291
292
def generate_pdf_template(template_name, fields):
293
    increment_template = facturacion_environment.get_template(template_name)
294
    md5_document = increment_template.render(
295
        fields
296
    )
297
    html = markdown(md5_document)
298
    pdf = HTML(string=html).write_pdf(
299
        target=None,
300
        stylesheets=[
301
            os.path.join(SOURCE_DIRECTORY, "markdown_styles", "markdown6.css"),
302
            CSS(
303
                string='@page { width: Letter; margin: 1.6cm 1.6cm 1.6cm 1.6cm; }'
304
            )
305
        ]
306
    )
307
    return pdf
308
309
310
def generate_html_template(template_name, fields):
311
    increment_template = facturacion_environment.get_template(template_name)
312
    render = increment_template.render(
313
        fields
314
    )
315
    return render
316
317
318
def mf_pago_fmt(cfdi):
319
    i = cfdi
320
    if i['TipoDeComprobante'] == "I":
321
        return i['TipoDeComprobante'].code + ' ' + i['MetodoPago'].code + ' ' + (i['FormaPago'].code if i['FormaPago'].code != '99' else '')
322
    return i['TipoDeComprobante'].code
323
324
325
def ajustes_directory(dp: DatePeriod):
326
    return os.path.join(archivos_folder(dp), 'ajustes')
327