Passed
Push — master ( 037c10...ba19d9 )
by Jordi
10:46
created

bika.lims.api.mail.send_email()   A

Complexity

Conditions 4

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 22
rs 9.75
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
3
import mimetypes
4
import os
5
import re
6
import socket
7
from email import encoders
8
from email.header import Header
9
from email.Message import Message
10
from email.mime.base import MIMEBase
11
from email.mime.multipart import MIMEMultipart
12
from email.mime.text import MIMEText
13
from email.Utils import formataddr
14
from smtplib import SMTPException
15
from string import Template
16
from StringIO import StringIO
17
18
from bika.lims import api
19
from bika.lims import logger
20
from Products.CMFPlone.utils import safe_unicode
21
22
# RFC 2822 local-part: dot-atom or quoted-string
23
# characters allowed in atom: A-Za-z0-9!#$%&'*+-/=?^_`{|}~
24
# RFC 2821 domain: max 255 characters
25
_LOCAL_RE = re.compile(r'([A-Za-z0-9!#$%&\'*+\-/=?^_`{|}~]+'
26
                       r'(\.[A-Za-z0-9!#$%&\'*+\-/=?^_`{|}~]+)*|'
27
                       r'"[^(\|")]*")@[^@]{3,255}$')
28
29
# RFC 2821 local-part: max 64 characters
30
# RFC 2821 domain: sequence of dot-separated labels
31
# characters allowed in label: A-Za-z0-9-, first is a letter
32
# Even though the RFC does not allow it all-numeric domains do exist
33
_DOMAIN_RE = re.compile(r'[^@]{1,64}@[A-Za-z0-9][A-Za-z0-9-]*'
34
                        r'(\.[A-Za-z0-9][A-Za-z0-9-]*)+$')
35
36
37
def to_email_address(address, name=""):
38
    """Convert the given address, name pair to an email address
39
40
    :param address: The email address
41
    :type address: basestring
42
    :param name: The real name of the person owning the email address
43
    :type name: basestring
44
    :returns: Email address suitable for an RFC 2822 From, To or Cc header
45
    """
46
    pair = (name, address)
47
    return formataddr(pair)
48
49
50
def to_email_subject(subject):
51
    """Convert the given subject to an email subject
52
53
    :param subject: The email subject
54
    :type subject: basestring
55
    :returns: Encoded email subject header
56
    """
57
    if not isinstance(subject, basestring):
58
        raise TypeError("Expected string, got '{}'".format(type(subject)))
59
    return Header(s=safe_unicode(subject), charset="utf8")
60
61
62
def to_email_body_text(body, **kw):
63
    """Convert the given body template to a text/plain type MIME document
64
65
    :param body: The email body text or template
66
    :type body: basestring
67
    :returns: MIMEText
68
    """
69
    body_template = Template(safe_unicode(body)).safe_substitute(**kw)
70
    return MIMEText(body_template, _subtype="plain", _charset="utf8")
71
72
73
def to_email_attachment(file_or_path, filename="", **kw):
74
    """Create a new MIME Attachment
75
76
    The Content-Type: header is build from the maintype and subtype of the
77
    guessed filename mimetype. Additional parameters for this header are
78
    taken from the keyword arguments.
79
80
    :param file_or_path: OS-level file or absolute path
81
    :type file_or_path: str, FileIO, MIMEBase
82
    :param filename: Filename to use
83
    :type filedata: str
84
    :returns: MIME Attachment
85
    """
86
    filedata = ""
87
    maintype = "application"
88
    subtype = "octet-stream"
89
90
    # Handle attachment
91
    if isinstance(file_or_path, MIMEBase):
92
        # return immediately
93
        return file_or_path
94
    # Handle file/StringIO
95
    elif isinstance(file_or_path, (file, StringIO)):
96
        filedata = file_or_path.read()
97
    # Handle file path
98
    elif os.path.isfile(file_or_path):
99
        filename = filename or os.path.basename(file_or_path)
100
        with open(file_or_path, "r") as f:
101
            # read the filedata from the filepath
102
            filedata = f.read()
103
104
    # Set MIME type from keyword arguments or guess it from the filename
105
    mime_type = kw.pop("mime_type", None) or mimetypes.guess_type(filename)[0]
106
    if mime_type is not None:
107
        maintype, subtype = mime_type.split("/")
108
109
    attachment = MIMEBase(maintype, subtype, **kw)
110
    attachment.set_payload(filedata)
111
    encoders.encode_base64(attachment)
112
    attachment.add_header("Content-Disposition",
113
                          "attachment; filename=%s" % filename)
114
    return attachment
115
116
117
def is_valid_email_address(address):
118
    """Check if the given address is a valid email address
119
120
    Code taken from `CMFDefault.utils.checkEmailAddress`
121
122
    :param address: The email address to check
123
    :type address: basestring
124
    :returns: True if the address is a valid email
125
    """
126
    if not isinstance(address, basestring):
127
        return False
128
    if not _LOCAL_RE.match(address):
129
        return False
130
    if not _DOMAIN_RE.match(address):
131
        return False
132
    return True
133
134
135
def parse_email_address(address):
136
    """Parse a given name/email pair
137
138
    :param address: The name/email string to parse
139
    :type address: basestring
140
    :returns: RFC 2822 email address
141
    """
142
    if not isinstance(address, basestring):
143
        raise ValueError("Expected a string, got {}".format(type(address)))
144
145
    # parse <name>, <email> recipient
146
    splitted = map(lambda s: s.strip(),
147
                   safe_unicode(address).rsplit(",", 1))
148
149
    pair = []
150
    for s in splitted:
151
        if is_valid_email_address(s):
152
            pair.insert(0, s)
153
        else:
154
            pair.append(s)
155
156
    return to_email_address(*pair)
157
158
159
def compose_email(from_addr, to_addr, subj, body, attachments=[], **kw):
160
    """Compose a RFC 2822 MIME message
161
162
    :param from_address: Email from address
163
    :param to_address: List of email or (name, email) pairs
164
    :param subject: Email subject
165
    :param body: Email body
166
    :param attachments: List of email attachments
167
    :returns: MIME message
168
    """
169
    _preamble = "This is a multi-part message in MIME format.\n"
170
    _from = to_email_address(from_addr)
171
    _to = to_email_address(to_addr)
172
    _subject = to_email_subject(subj)
173
    _body = to_email_body_text(body, **kw)
174
175
    # Create the enclosing message
176
    mime_msg = MIMEMultipart()
177
    mime_msg.preamble = _preamble
178
    mime_msg["Subject"] = _subject
179
    mime_msg["From"] = _from
180
    mime_msg["To"] = _to
181
    mime_msg.attach(_body)
182
183
    # Attach attachments
184
    for attachment in attachments:
185
        mime_msg.attach(to_email_attachment(attachment))
186
187
    return mime_msg
188
189
190
def send_email(email, immediate=True):
191
    """Send the email via the MailHost tool
192
193
    :param email: Email message or string
194
    :type email: Message or basestring
195
    :param immediate: True to send the email immediately
196
    :type immediately: bool
197
    :returns: True if the email delivery was successful
198
    """
199
    if not isinstance(email, (basestring, Message)):
200
        raise TypeError("Email must be a Message or basestring")
201
202
    try:
203
        mailhost = api.get_tool("MailHost")
204
        mailhost.send(email, immediate=immediate)
205
    except SMTPException as e:
206
        logger.error(e)
207
        return False
208
    except socket.error as e:
209
        logger.error(e)
210
        return False
211
    return True
212