Passed
Push — main ( c270b1...657d48 )
by Sat CFDI
01:50
created

satdigitalinvoice.local.load_data()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
import logging
2
import os
3
import pickle
4
from uuid import UUID
5
6
import diskcache
7
from satcfdi.accounting import SatCFDI
8
from satcfdi.pacs import sat
9
from .log_tools import print_yaml
10
11
# PUE_PAGADA = 0
12
# PPD_IGNORAR = 1
13
EMAIL_NOTIFICADA = 2
14
STATUS_SAT = 3
15
PENDIENTE = 4
16
17
sat_manager = sat.SAT()
18
19
logger = logging.getLogger(__name__)
20
DATA_DIR = '.data'
21
22
23
class LocalDB:
24
    def __init__(self):
25
        self.local_storage = diskcache.Cache(os.path.join(DATA_DIR, 'cache'))
26
27
    def pendiente(self, uuid: UUID):
28
        return self.local_storage.get(
29
            (PENDIENTE, uuid), False
30
        )
31
32
    def pendiente_set(self, uuid: UUID, value: bool):
33
        if value:
34
            self.local_storage[(PENDIENTE, uuid)] = value
35
        else:
36
            try:
37
                del self.local_storage[(PENDIENTE, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable PENDIENTE does not seem to be defined.
Loading history...
38
            except KeyError:
39
                pass
40
41
    def notificada(self, uuid: UUID):
42
        return self.local_storage.get(
43
            (EMAIL_NOTIFICADA, uuid), False
44
        )
45
46
    def notificada_set(self, uuid: UUID, value: bool):
47
        if value:
48
            self.local_storage[(EMAIL_NOTIFICADA, uuid)] = value
49
        else:
50
            try:
51
                del self.local_storage[(EMAIL_NOTIFICADA, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable EMAIL_NOTIFICADA does not seem to be defined.
Loading history...
52
            except KeyError:
53
                pass
54
55
    def status_sat(self, uuid: UUID):
56
        return self.local_storage.get(
57
            (STATUS_SAT, uuid), {}
58
        )
59
60
    def status_sat_set(self, uuid: UUID, value: dict):
61
        if value:
62
            self.local_storage[(STATUS_SAT, uuid)] = value
63
        else:
64
            try:
65
                del self.local_storage[(STATUS_SAT, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable STATUS_SAT does not seem to be defined.
Loading history...
66
            except KeyError:
67
                pass
68
69
70
class LocalDBSatCFDI(LocalDB):
71
    def __init__(self, config):
72
        super().__init__()
73
        self.config = config
74
75
    def notificada(self, cfdi: SatCFDI):
76
        if cfdi["Fecha"] < self.config['notificar_a_partir']:
77
            return True
78
        return super().notificada(cfdi.uuid)
79
80
    def notificada_flip(self, cfdi: SatCFDI):
81
        v = not self.notificada(cfdi)
82
        self.notificada_set(cfdi.uuid, v)
83
        return v
84
85
    def pendiente(self, cfdi: SatCFDI):
86
        if cfdi["Fecha"] <= self.config['pendiente_a_partir'][cfdi["MetodoPago"]]:
87
            return True
88
        return super().pendiente(cfdi.uuid)
89
90
    def pendiente_flip(self, cfdi: SatCFDI):
91
        v = not self.pendiente(cfdi)
92
        self.pendiente_set(cfdi.uuid, v)
93
        return v
94
95
    def status_sat(self, cfdi: SatCFDI, update=False):
96
        if update:
97
            res = sat_manager.status(cfdi)
98
            if res["ValidacionEFOS"] == "200":
99
                self.status_sat_set(cfdi.uuid, res)
100
            return res
101
        else:
102
            return super().status_sat(cfdi.uuid)
103
104
    def describe(self, cfdi: SatCFDI):
105
        print_yaml({
106
            'pendiente': self.pendiente(cfdi),
107
            'email_notificada': self.notificada(cfdi),
108
            'status_sat': self.status_sat(cfdi)
109
        })
110
111
112
def save_data(file, data):
113
    with open(os.path.join(DATA_DIR, file), 'wb') as f:
114
        pickle.dump(data, f)
115
116
117
def load_data(file, default=None):
118
    try:
119
        with open(os.path.join(DATA_DIR, file), 'rb') as f:
120
            return pickle.load(f)
121
    except FileNotFoundError:
122
        return default
123