satdigitalinvoice.email.EmailSender.send_email()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
cc 4
nop 5
1
import base64
2
import imaplib
3
import smtplib
4
from email.mime.application import MIMEApplication
5
from email.mime.multipart import MIMEMultipart
6
from email.mime.text import MIMEText
7
from os.path import basename
8
import email
9
import email.utils
10
import logging
11
12
logger = logging.getLogger(__name__)
13
14
15
class EmailManager:
16
    def __init__(self, stmp_host, stmp_port, imap_host, imap_port, user, password, xoauth2_token=None):
17
        # self.receiver = EmailReceiver(
18
        #     host=imap_host,
19
        #     port=imap_port,
20
        #     user=user,
21
        #     password=password,
22
        #     xoauth2_token=xoauth2_token
23
        # )
24
        self.sender = EmailSender(
25
            host=stmp_host,
26
            port=stmp_port,
27
            user=user,
28
            password=password,
29
            xoauth2_token=xoauth2_token
30
        )
31
32
33
# class EmailReceiver:
34
#     def __init__(self, host, port, user, password):
35
#         self.host = host
36
#         self.port = port
37
#         self.user = user
38
#         self.password = password
39
#
40
#     def __enter__(self):
41
#         self.server = imaplib.IMAP4_SSL(self.host, self.port)
42
#         self.server.login(self.user, password=self.password)
43
#
44
#         return self.server
45
#
46
#     def __exit__(self, exc_type, exc, exc_tb):
47
#         self.server.close()
48
#
49
#     def get_mail_attachments(self, mailbox):
50
#         with self as mail:
51
#             mail.select(
52
#                 mailbox=mailbox,
53
#                 readonly=False
54
#             )
55
#
56
#             typ, data = mail.search(None, 'UnSeen')  # mail.search(None, 'ALL')
57
#             logger.info("%s %s", typ, data)
58
#
59
#             for num in data[0].split():
60
#                 typ, data = mail.fetch(num, '(RFC822)')
61
#                 raw_email = data[0][1]
62
#                 # print('Message %s\n%s\n' % (num, raw_email))
63
#
64
#                 # converts byte literal to string removing b''
65
#                 raw_email_string = raw_email.decode('utf-8')
66
#                 email_message = email.message_from_string(raw_email_string)
67
#
68
#                 xml_data = None
69
#                 pdf_data = None
70
#
71
#                 for part in email_message.walk():
72
#                     # this part comes from the snipped I don't understand yet...
73
#                     if part.get_content_maintype() == 'multipart':
74
#                         continue
75
#                     if part.get('Content-Disposition') is None:
76
#                         continue
77
#
78
#                     file_name = part.get_filename()
79
#
80
#                     if file_name:
81
#                         if file_name.endswith(".xml"):
82
#                             if xml_data:
83
#                                 yield xml_data, pdf_data
84
#                                 pdf_data = None
85
#                             xml_data = part.get_payload(decode=True)
86
#                         elif file_name.endswith(".pdf"):
87
#                             if pdf_data:
88
#                                 yield xml_data, pdf_data
89
#                                 xml_data = None
90
#                             pdf_data = part.get_payload(decode=True)
91
#
92
#                 if xml_data or pdf_data:
93
#                     yield xml_data, pdf_data
94
#
95
#                 # mail.store(
96
#                 #     message_set=num,
97
#                 #     command='+FLAGS',
98
#                 #     flags='\\Seen'
99
#                 # )
100
101
102
def generate_oauth2_string(username, access_token) -> str:
103
    auth_string = 'user=' + username + '\1auth=Bearer ' + access_token + '\1\1'
104
    return base64.b64encode(auth_string.encode()).decode()
105
106
107
class EmailSender:
108
    def __init__(self, host, port, user, password, xoauth2_token=None):
109
        self.host = host
110
        self.port = port
111
        self.user = user
112
        self.password = password
113
        self.xoauth2_token = xoauth2_token
114
115
    def __enter__(self):
116
        self.server = smtplib.SMTP(host=self.host, port=self.port)
117
        self.server.starttls()  # Puts connection to SMTP server in TLS mode
118
119
        if self.xoauth2_token:
120
            self.server.ehlo_or_helo_if_needed()
121
            self.server.docmd('AUTH', 'XOAUTH2 ' + generate_oauth2_string(self.user, self.xoauth2_token()))
122
        else:
123
            self.server.login(
124
                user=self.user,
125
                password=self.password
126
            )
127
        return self
128
129
    # exc_type, exc, exc_tb
130
    def __exit__(self, exc_type, exc, exc_tb):
131
        self.server.close()
132
133
    def send_email(self, subject: str, to_addrs: list, html: str = None, file_attachments=None):
134
        msg = MIMEMultipart()
135
        msg['From'] = self.user
136
        msg['To'] = ", ".join(to_addrs)
137
        msg['Subject'] = subject
138
        if html:
139
            msg.attach(MIMEText(html, _subtype='html'))
140
141
        for f in file_attachments or []:
142
            with open(f, "rb") as fil:
143
                part = MIMEApplication(
144
                    fil.read(),
145
                    Name=basename(f)
146
                )
147
            # After the file is closed
148
            part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
149
            msg.attach(part)
150
151
        self.server.send_message(
152
            msg=msg
153
        )
154