Passed
Push — main ( 5e9c42...ed9591 )
by Sat CFDI
01:44
created

LocalDBSatCFDI.notificar_flip()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import logging
2
import os
3
import pickle
4
from uuid import UUID
5
6
import diskcache
7
from satcfdi.accounting import SatCFDI
8
from satcfdi.pacs import sat
9
10
from . import DATA_DIRECTORY
11
from .log_tools import print_yaml
12
13
EMAIL_NOTIFICADA = 2
14
STATUS_SAT = 3
15
PENDIENTE = 4
16
FOLIO = 5
17
18
sat_manager = sat.SAT()
19
20
logger = logging.getLogger(__name__)
21
22
23
class LocalDB(diskcache.Cache):
24
    def __init__(self):
25
        super().__init__(directory=os.path.join(DATA_DIRECTORY, 'cache'))
26
27
    def folio(self):
28
        return self.get(FOLIO, 1)
29
30
    def folio_set(self, value: int):
31
        self[FOLIO] = value
32
33
    def saldar(self, uuid: UUID):
34
        return self.get(
35
            (PENDIENTE, uuid), True
36
        )
37
38
    def saldar_set(self, uuid: UUID, value: bool):
39
        if value:
40
            try:
41
                del self[(PENDIENTE, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable PENDIENTE does not seem to be defined.
Loading history...
42
            except KeyError:
43
                pass
44
        else:
45
            self[(PENDIENTE, uuid)] = value
46
47
    def notificar(self, uuid: UUID):
48
        return self.get(
49
            (EMAIL_NOTIFICADA, uuid), True
50
        )
51
52
    def notificar_set(self, uuid: UUID, value: bool):
53
        if value:
54
            try:
55
                del self[(EMAIL_NOTIFICADA, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable EMAIL_NOTIFICADA does not seem to be defined.
Loading history...
56
            except KeyError:
57
                pass
58
        else:
59
            self[(EMAIL_NOTIFICADA, uuid)] = value
60
61
    def status_sat(self, uuid: UUID):
62
        return self.get(
63
            (STATUS_SAT, uuid), {}
64
        )
65
66
    def status_sat_set(self, uuid: UUID, value: dict):
67
        if value:
68
            self[(STATUS_SAT, uuid)] = value
69
        else:
70
            try:
71
                del self[(STATUS_SAT, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable STATUS_SAT does not seem to be defined.
Loading history...
72
            except KeyError:
73
                pass
74
75
76
class LocalDBSatCFDI(LocalDB):
77
    def __init__(self, enviar_a_partir, saldar_a_partir):
78
        super().__init__()
79
        self.enviar_a_partir = enviar_a_partir
80
        self.saldar_a_partir = saldar_a_partir
81
82
    def notificar(self, cfdi: SatCFDI):
83
        if cfdi["Fecha"] >= self.enviar_a_partir:
84
            return super().notificar(cfdi.uuid)
85
        return False
86
87
    def notificar_flip(self, cfdi: SatCFDI):
88
        v = not self.notificar(cfdi)
89
        self.notificar_set(cfdi.uuid, v)
90
        return v
91
92
    def saldar(self, cfdi: SatCFDI):
93
        if cfdi["TipoDeComprobante"] != "I":
94
            return None
95
        if cfdi["MetodoPago"] == "PPD" and cfdi.saldo_pendiente == 0:
96
            return 0
97
        if cfdi["Fecha"] >= self.saldar_a_partir[cfdi["MetodoPago"]]:
98
            if super().saldar(cfdi.uuid):
99
                if cfdi["MetodoPago"] == "PPD":
100
                    return cfdi.saldo_pendiente
101
                else:
102
                    return cfdi["Total"]
103
        return 0
104
105
    def saldar_flip(self, cfdi: SatCFDI):
106
        v = not self.saldar(cfdi)
107
        self.saldar_set(cfdi.uuid, v)
108
        return v
109
110
    def status_sat(self, cfdi: SatCFDI, update=False):
111
        if update:
112
            res = sat_manager.status(cfdi)
113
            if res["ValidacionEFOS"] == "200":
114
                self.status_sat_set(cfdi.uuid, res)
115
            return res
116
        else:
117
            return super().status_sat(cfdi.uuid)
118
119
    def describe(self, cfdi: SatCFDI):
120
        print_yaml({
121
            'saldar': self.saldar(cfdi),
122
            'enviar': self.notificar(cfdi),
123
            'status_sat': self.status_sat(cfdi)
124
        })
125
126
127
def save_data(file, data):
128
    with open(os.path.join(DATA_DIRECTORY, file), 'wb') as f:
129
        pickle.dump(data, f)
130
131
132
def load_data(file, default=None):
133
    try:
134
        with open(os.path.join(DATA_DIRECTORY, file), 'rb') as f:
135
            return pickle.load(f)
136
    except FileNotFoundError:
137
        return default
138