Passed
Push — main ( a2b71a...74be07 )
by Sat CFDI
01:45
created

satdigitalinvoice.localdb.LocalDB.notified()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import logging
2
import os
3
import pickle
4
from enum import Enum
5
from uuid import UUID
6
7
import diskcache
8
from satcfdi.accounting import SatCFDI
9
from satcfdi.pacs import sat
10
11
from . import DATA_DIRECTORY
12
from .log_tools import print_yaml
13
from . import PPD
14
15
LIQUIDATED = 0
16
NOTIFIED = 2
17
STATUS_SAT = 3
18
FOLIO = 5
19
SERIE = 6
20
21
sat_manager = sat.SAT()
22
23
logger = logging.getLogger(__name__)
24
25
26
class LocalDB(diskcache.Cache):
27
    def __init__(self):
28
        super().__init__(directory=os.path.join(DATA_DIRECTORY, 'cache'))
29
30
    def folio(self) -> int:
31
        return self.get(FOLIO, 1000)
32
33
    def folio_set(self, value: int):
34
        self[FOLIO] = value
35
36
    def serie(self) -> str:
37
        return self.get(SERIE, 'A')
38
39
    def serie_set(self, value: str):
40
        self[SERIE] = value
41
42
    def liquidated(self, uuid: UUID):
43
        return self.get((LIQUIDATED, uuid))
44
45
    def liquidated_set(self, uuid: UUID, value: bool):
46
        self[(LIQUIDATED, uuid)] = value
47
48
    def notified(self, uuid: UUID):
49
        return self.get((NOTIFIED, uuid))
50
51
    def notified_set(self, uuid: UUID, value: bool):
52
        self[(NOTIFIED, uuid)] = value
53
54
    def status_sat(self, uuid: UUID):
55
        return self.get((STATUS_SAT, uuid), {})
56
57
    def status_sat_set(self, uuid: UUID, value: dict):
58
        if value:
59
            self[(STATUS_SAT, uuid)] = value
60
        else:
61
            try:
62
                del self[(STATUS_SAT, uuid)]
63
            except KeyError:
64
                pass
65
66
67
class LiquidatedState(Enum):
68
    NONE = 1
69
    YES = 2
70
    NO = 3
71
    IGNORED = 4
72
    CANCELLED = 5
73
74
    def __str__(self):
75
        if self.name == "NONE":
76
            return ""
77
        if self.name == "IGNORED":
78
            return "Ignorada"
79
        if self.name == "YES":
80
            return "Si"
81
        if self.name == "NO":
82
            return "No"
83
        if self.name == "CANCELLED":
84
            return "Cancelada"
85
86
87
class LocalDBSatCFDI(LocalDB):
88
    def __init__(self, enviar_a_partir, pagar_a_partir):
89
        super().__init__()
90
        self.enviar_a_partir = enviar_a_partir
91
        self.pagar_a_partir = pagar_a_partir
92
93
    def notified(self, cfdi: SatCFDI):
94
        v = super().notified(cfdi.uuid)
95
        if v is None and cfdi["Fecha"] < self.enviar_a_partir:
96
            return True
97
        return v
98
99
    def notified_flip(self, cfdi: SatCFDI):
100
        v = not self.notified(cfdi)
101
        self.notified_set(cfdi.uuid, v)
102
        return v
103
104
    def liquidated(self, cfdi: SatCFDI):
105
        v = super().liquidated(cfdi.uuid)
106
        if v is None and cfdi["Fecha"] < self.pagar_a_partir[cfdi["MetodoPago"]]:
107
            return True
108
        return v
109
110
    def liquidated_flip(self, cfdi: SatCFDI):
111
        v = not self.liquidated(cfdi)
112
        self.liquidated_set(cfdi.uuid, v)
113
        return v
114
115
    def status_sat(self, cfdi: SatCFDI, update=False):
116
        if update:
117
            res = sat_manager.status(cfdi)
118
            if res["ValidacionEFOS"] == "200":
119
                self.status_sat_set(cfdi.uuid, res)
120
            return res
121
        else:
122
            return super().status_sat(cfdi.uuid)
123
124
    def liquidated_state(self, cfdi: SatCFDI):
125
        if cfdi.estatus == '0':
126
            return LiquidatedState.CANCELLED
127
128
        if cfdi["TipoDeComprobante"] != "I":
129
            return LiquidatedState.NONE
130
131
        mpago = cfdi["MetodoPago"]
132
        if cfdi['Total'] == 0 or (mpago == PPD and cfdi.saldo_pendiente == 0):
133
            return LiquidatedState.YES
134
135
        if self.liquidated(cfdi):
136
            if mpago == PPD:
137
                return LiquidatedState.IGNORED
138
            return LiquidatedState.YES
139
140
        return LiquidatedState.NO
141
142
    def describe(self, cfdi: SatCFDI):
143
        print_yaml({
144
            'saldada': self.liquidated(cfdi),
145
            'enviada': self.notified(cfdi),
146
            'status_sat': self.status_sat(cfdi)
147
        })
148
149
150
def save_data(file, data):
151
    with open(os.path.join(DATA_DIRECTORY, file), 'wb') as f:
152
        pickle.dump(data, f)
153
154
155
def load_data(file, default=None):
156
    try:
157
        with open(os.path.join(DATA_DIRECTORY, file), 'rb') as f:
158
            return pickle.load(f)
159
    except FileNotFoundError:
160
        return default
161