Passed
Push — main ( 3a643b...e33dea )
by Sat CFDI
01:53
created

LocalDBSatCFDI.status_sat()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 3
nop 3
1
import logging
2
import os
3
import pickle
4
from uuid import UUID
5
6
import diskcache
7
from satcfdi.accounting import SatCFDI
8
from satcfdi.pacs import sat
9
from .log_tools import print_yaml
10
11
PUE_PAGADA = 0
12
PPD_IGNORAR = 1
13
EMAIL_NOTIFICADA = 2
14
STATUS_SAT = 3
15
16
sat_manager = sat.SAT()
17
18
logger = logging.getLogger(__name__)
19
DATA_DIR = '.data'
20
21
22
class LocalDB:
23
    def __init__(self):
24
        self.local_storage = diskcache.Cache(os.path.join(DATA_DIR, 'cache'))
25
26
    def pue_pagada(self, uuid: UUID):
27
        return self.local_storage.get(
28
            (PUE_PAGADA, uuid), False
29
        )
30
31
    def pue_pagada_set(self, uuid: UUID, value: bool):
32
        if value:
33
            self.local_storage[(PUE_PAGADA, uuid)] = value
34
        else:
35
            try:
36
                del self.local_storage[(PUE_PAGADA, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable PUE_PAGADA does not seem to be defined.
Loading history...
37
            except KeyError:
38
                pass
39
40
    def ppd_ignorar(self, uuid: UUID):
41
        return self.local_storage.get(
42
            (PPD_IGNORAR, uuid), False
43
        )
44
45
    def ppd_ignorar_set(self, uuid: UUID, value: bool):
46
        if value:
47
            self.local_storage[(PPD_IGNORAR, uuid)] = value
48
        else:
49
            try:
50
                del self.local_storage[(PPD_IGNORAR, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable PPD_IGNORAR does not seem to be defined.
Loading history...
51
            except KeyError:
52
                pass
53
54
    def email_notificada(self, uuid: UUID):
55
        return self.local_storage.get(
56
            (EMAIL_NOTIFICADA, uuid), False
57
        )
58
59
    def email_notificada_set(self, uuid: UUID, value: bool):
60
        if value:
61
            self.local_storage[(EMAIL_NOTIFICADA, uuid)] = value
62
        else:
63
            try:
64
                del self.local_storage[(EMAIL_NOTIFICADA, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable EMAIL_NOTIFICADA does not seem to be defined.
Loading history...
65
            except KeyError:
66
                pass
67
68
    def status_sat(self, uuid: UUID):
69
        return self.local_storage.get(
70
            (STATUS_SAT, uuid), {}
71
        )
72
73
    def status_sat_set(self, uuid: UUID, value: dict):
74
        if value:
75
            self.local_storage[(STATUS_SAT, uuid)] = value
76
        else:
77
            try:
78
                del self.local_storage[(STATUS_SAT, uuid)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable STATUS_SAT does not seem to be defined.
Loading history...
79
            except KeyError:
80
                pass
81
82
83
class LocalDBSatCFDI(LocalDB):
84
    def __init__(self, config):
85
        super().__init__()
86
        self.config = config
87
88
    def email_notificada(self, cfdi: SatCFDI):
89
        if cfdi["Fecha"] <= self.config['email_notificada_hasta']:
90
            return True
91
        return super().email_notificada(cfdi.uuid)
92
93
    def pue_pagada(self, cfdi: SatCFDI):
94
        if cfdi["Fecha"] <= self.config['pue_pagada_hasta']:
95
            return True
96
        return super().pue_pagada(cfdi.uuid)
97
98
    def ppd_ignorar(self, cfdi: SatCFDI):
99
        if cfdi["Fecha"] <= self.config['ppd_ignorar_hasta']:
100
            return True
101
        return super().ppd_ignorar(cfdi.uuid)
102
103
    def status_sat(self, cfdi: SatCFDI, update=False):
104
        if update:
105
            res = sat_manager.status(cfdi)
106
            if res["ValidacionEFOS"] == "200":
107
                self.status_sat_set(cfdi.uuid, res)
108
            return res
109
        else:
110
            return super().status_sat(cfdi.uuid)
111
112
    def describe(self, cfdi: SatCFDI):
113
        print_yaml({
114
            'pue_pagada': self.pue_pagada(cfdi),
115
            'ppd_ignorar': self.ppd_ignorar(cfdi),
116
            'email_notificada': self.email_notificada(cfdi),
117
            'status_sat': self.status_sat(cfdi)
118
        })
119
120
121
def save_data(file, data):
122
    with open(os.path.join(DATA_DIR, file), 'wb') as f:
123
        pickle.dump(data, f)
124
125
126
def load_data(file, default=None):
127
    try:
128
        with open(os.path.join(DATA_DIR, file), 'rb') as f:
129
            return pickle.load(f)
130
    except FileNotFoundError:
131
        return default
132