Passed
Push — main ( f6b891...c270b1 )
by Sat CFDI
01:48
created

LocalDBSatCFDI.pue_pagada()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
import logging
2
import os
3
import pickle
4
from uuid import UUID
5
6
import diskcache
7
from satcfdi.accounting import SatCFDI
8
from satcfdi.pacs import sat
9
from .log_tools import print_yaml
10
11
PUE_PAGADA = 0
12
PPD_IGNORAR = 1
13
EMAIL_NOTIFICADA = 2
14
STATUS_SAT = 3
15
16
sat_manager = sat.SAT()
17
18
logger = logging.getLogger(__name__)
19
DATA_DIR = '.data'
20
21
22
class LocalDB:
23
    def __init__(self):
24
        self.local_storage = diskcache.Cache(os.path.join(DATA_DIR, 'cache'))
25
26
    def pue_pagada(self, uuid: UUID):
27
        return self.local_storage.get(
28
            (PUE_PAGADA, uuid), False
29
        )
30
31
    def pue_pagada_set(self, uuid: UUID, value: bool):
32
        if value:
33
            self.local_storage[(PUE_PAGADA, uuid)] = value
34
        else:
35
            try:
36
                del self.local_storage[(PUE_PAGADA, uuid)]
37
            except KeyError:
38
                pass
39
40
    def ppd_ignorar(self, uuid: UUID):
41
        return self.local_storage.get(
42
            (PPD_IGNORAR, uuid), False
43
        )
44
45
    def ppd_ignorar_set(self, uuid: UUID, value: bool):
46
        if value:
47
            self.local_storage[(PPD_IGNORAR, uuid)] = value
48
        else:
49
            try:
50
                del self.local_storage[(PPD_IGNORAR, uuid)]
51
            except KeyError:
52
                pass
53
54
    def email_notificada(self, uuid: UUID):
55
        return self.local_storage.get(
56
            (EMAIL_NOTIFICADA, uuid), False
57
        )
58
59
    def email_notificada_set(self, uuid: UUID, value: bool):
60
        if value:
61
            self.local_storage[(EMAIL_NOTIFICADA, uuid)] = value
62
        else:
63
            try:
64
                del self.local_storage[(EMAIL_NOTIFICADA, uuid)]
65
            except KeyError:
66
                pass
67
68
    def status_sat(self, uuid: UUID):
69
        return self.local_storage.get(
70
            (STATUS_SAT, uuid), {}
71
        )
72
73
    def status_sat_set(self, uuid: UUID, value: dict):
74
        if value:
75
            self.local_storage[(STATUS_SAT, uuid)] = value
76
        else:
77
            try:
78
                del self.local_storage[(STATUS_SAT, uuid)]
79
            except KeyError:
80
                pass
81
82
83
class LocalDBSatCFDI(LocalDB):
84
    def __init__(self, config):
85
        super().__init__()
86
        self.config = config
87
88
    def email_notificada(self, cfdi: SatCFDI):
89
        if cfdi["Fecha"] <= self.config['email_notificada_hasta']:
90
            return True
91
        return super().email_notificada(cfdi.uuid)
92
93
    def email_notificada_flip(self, cfdi: SatCFDI):
94
        v = not self.email_notificada(cfdi)
95
        self.email_notificada_set(cfdi.uuid, v)
96
        return v
97
98
    def pue_pagada(self, cfdi: SatCFDI):
99
        if cfdi["Fecha"] <= self.config['pue_pagada_hasta']:
100
            return True
101
        return super().pue_pagada(cfdi.uuid)
102
103
    def pue_pagada_flip(self, cfdi: SatCFDI):
104
        v = not self.pue_pagada(cfdi)
105
        self.pue_pagada_set(cfdi.uuid, v)
106
        return v
107
108
    def ppd_ignorar(self, cfdi: SatCFDI):
109
        if cfdi["Fecha"] <= self.config['ppd_ignorar_hasta']:
110
            return True
111
        return super().ppd_ignorar(cfdi.uuid)
112
113
    def ppd_ignorar_flip(self, cfdi: SatCFDI):
114
        v = not self.ppd_ignorar(cfdi)
115
        self.ppd_ignorar_set(cfdi.uuid, v)
116
        return v
117
118
    def status_sat(self, cfdi: SatCFDI, update=False):
119
        if update:
120
            res = sat_manager.status(cfdi)
121
            if res["ValidacionEFOS"] == "200":
122
                self.status_sat_set(cfdi.uuid, res)
123
            return res
124
        else:
125
            return super().status_sat(cfdi.uuid)
126
127
    def describe(self, cfdi: SatCFDI):
128
        print_yaml({
129
            'pue_pagada': self.pue_pagada(cfdi),
130
            'ppd_ignorar': self.ppd_ignorar(cfdi),
131
            'email_notificada': self.email_notificada(cfdi),
132
            'status_sat': self.status_sat(cfdi)
133
        })
134
135
136
def save_data(file, data):
137
    with open(os.path.join(DATA_DIR, file), 'wb') as f:
138
        pickle.dump(data, f)
139
140
141
def load_data(file, default=None):
142
    try:
143
        with open(os.path.join(DATA_DIR, file), 'rb') as f:
144
            return pickle.load(f)
145
    except FileNotFoundError:
146
        return default
147