Passed
Push — 2.x ( 990d46...24d087 )
by Ramon
09:37 queued 03:54
created

bika.lims.api.mail.to_email_attachment()   C

Complexity

Conditions 9

Size

Total Lines 57
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 33
dl 0
loc 57
rs 6.6666
c 0
b 0
f 0
cc 9
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import io
22
import mimetypes
23
import os
24
import re
25
import six
26
import socket
27
from email import encoders
28
from email.header import Header
29
from email.Message import Message
30
from email.mime.base import MIMEBase
31
from email.mime.multipart import MIMEMultipart
32
from email.mime.text import MIMEText
33
from email.Utils import formataddr
34
from email.Utils import parseaddr
35
from smtplib import SMTPException
36
from string import Template
37
from six import StringIO
38
39
from bika.lims import api
40
from bika.lims import logger
41
from plone.app.blob.field import BlobWrapper
42
from Products.CMFPlone.utils import safe_unicode
43
44
try:
45
    file_types = (file, io.IOBase)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
46
except NameError:
47
    file_types = (io.IOBase,)
48
49
# RFC 2822 local-part: dot-atom or quoted-string
50
# characters allowed in atom: A-Za-z0-9!#$%&'*+-/=?^_`{|}~
51
# RFC 2821 domain: max 255 characters
52
_LOCAL_RE = re.compile(r'([A-Za-z0-9!#$%&\'*+\-/=?^_`{|}~]+'
53
                       r'(\.[A-Za-z0-9!#$%&\'*+\-/=?^_`{|}~]+)*|'
54
                       r'"[^(\|")]*")@[^@]{3,255}$')
55
56
# RFC 2821 local-part: max 64 characters
57
# RFC 2821 domain: sequence of dot-separated labels
58
# characters allowed in label: A-Za-z0-9-, first is a letter
59
# Even though the RFC does not allow it all-numeric domains do exist
60
_DOMAIN_RE = re.compile(r'[^@]{1,64}@[A-Za-z0-9][A-Za-z0-9-]*'
61
                        r'(\.[A-Za-z0-9][A-Za-z0-9-]*)+$')
62
63
64
def to_email_address(address, name=""):
65
    """Convert the given address, name pair to an email address
66
67
    :param address: The email address
68
    :type address: basestring
69
    :param name: The real name of the person owning the email address
70
    :type name: basestring
71
    :returns: Email address suitable for an RFC 2822 From, To or Cc header
72
    """
73
    pair = (name, address)
74
    return formataddr(pair)
75
76
77
def parse_email_address(address):
78
    """Parse a given name/email pair
79
80
    :param address: The name/email string to parse
81
    :type address: basestring
82
    :returns: Tuple of (name, email)
83
    """
84
    if not isinstance(address, six.string_types):
85
        raise ValueError("Expected a string, got {}".format(type(address)))
86
    return parseaddr(address)
87
88
89
def to_email_subject(subject):
90
    """Convert the given subject to an email subject
91
92
    :param subject: The email subject
93
    :type subject: basestring
94
    :returns: Encoded email subject header
95
    """
96
    if not isinstance(subject, six.string_types):
97
        raise TypeError("Expected string, got '{}'".format(type(subject)))
98
    return Header(s=safe_unicode(subject), charset="utf8")
99
100
101
def to_email_body_text(body, **kw):
102
    """Convert the given body template to a text/plain type MIME document
103
104
    :param body: The email body text or template
105
    :type body: basestring
106
    :returns: MIMEText
107
    """
108
    body_template = Template(safe_unicode(body)).safe_substitute(**kw)
109
    return MIMEText(body_template, _subtype="plain", _charset="utf8")
110
111
112
def to_email_attachment(filedata, filename="", **kw):
113
    """Create a new MIME Attachment
114
115
    The Content-Type: header is build from the maintype and subtype of the
116
    guessed filename mimetype. Additional parameters for this header are
117
    taken from the keyword arguments.
118
119
    :param filedata: File, file path, filedata
120
    :type filedata: FileIO, MIMEBase, str
121
    :param filename: Filename to use
122
    :type filename: str
123
    :returns: MIME Attachment
124
    """
125
    data = ""
126
    maintype = "application"
127
    subtype = "octet-stream"
128
    mime_type = kw.pop("mime_type", None)
129
130
    def is_file(s):
131
        try:
132
            return os.path.exists(s)
133
        except TypeError:
134
            return False
135
136
    # Handle attachment
137
    if isinstance(filedata, MIMEBase):
138
        # return immediately
139
        return filedata
140
    # Handle file/StringIO
141
    elif isinstance(filedata, (file_types, StringIO)):
142
        data = filedata.read()
143
    # Handle file paths
144
    if is_file(filedata):
145
        filename = filename or os.path.basename(filedata)
146
        with open(filedata, "r") as f:
147
            # read the filedata from the filepath
148
            data = f.read()
149
    # Handle raw filedata
150
    elif isinstance(filedata, six.string_types):
151
        data = filedata
152
    # Handle wrapper for zodb blob
153
    elif isinstance(filedata, BlobWrapper):
154
        filename = filename or filedata.getFilename()
155
        mime_type = mime_type or filedata.getContentType()
156
        data = filedata.data
157
158
    # Set MIME type from keyword arguments or guess it from the filename
159
    mime_type = mime_type or mimetypes.guess_type(filename)[0]
160
    if mime_type is not None:
161
        maintype, subtype = mime_type.split("/")
162
163
    attachment = MIMEBase(maintype, subtype, **kw)
164
    attachment.set_payload(data)
165
    encoders.encode_base64(attachment)
166
    attachment.add_header("Content-Disposition",
167
                          "attachment; filename=%s" % filename)
168
    return attachment
169
170
171
def is_valid_email_address(address):
172
    """Check if the given address is a valid email address
173
174
    Code taken from `CMFDefault.utils.checkEmailAddress`
175
176
    :param address: The email address to check
177
    :type address: str
178
    :returns: True if the address is a valid email
179
    """
180
    if not isinstance(address, six.string_types):
181
        return False
182
    if not _LOCAL_RE.match(address):
183
        return False
184
    if not _DOMAIN_RE.match(address):
185
        return False
186
    return True
187
188
189
def compose_email(from_addr, to_addr, subj, body, attachments=[], **kw):
190
    """Compose a RFC 2822 MIME message
191
192
    :param from_address: Email from address
193
    :param to_address: An email or a list of emails
194
    :param subject: Email subject
195
    :param body: Email body
196
    :param attachments: List of email attachments
197
    :returns: MIME message
198
    """
199
    _preamble = "This is a multi-part message in MIME format.\n"
200
    _from = to_email_address(from_addr)
201
    if isinstance(to_addr, six.string_types):
202
        to_addr = [to_addr]
203
    _to = map(to_email_address, to_addr)
204
    _subject = to_email_subject(subj)
205
    _body = to_email_body_text(body, **kw)
206
207
    # Create the enclosing message
208
    mime_msg = MIMEMultipart()
209
    mime_msg.preamble = _preamble
210
    mime_msg["Subject"] = _subject
211
    mime_msg["From"] = _from
212
    mime_msg["To"] = ", ".join(_to)
213
    mime_msg.attach(_body)
214
215
    # Attach attachments
216
    for attachment in attachments:
217
        mime_msg.attach(to_email_attachment(attachment))
218
219
    return mime_msg
220
221
222
def send_email(email, immediate=True):
223
    """Send the email via the MailHost tool
224
225
    :param email: Email message or string
226
    :type email: Message or str
227
    :param immediate: True to send the email immediately
228
    :type immediately: bool
229
    :returns: True if the email delivery was successful
230
    """
231
    if not isinstance(email, (six.string_types, Message)):
232
        raise TypeError("Email must be a Message or str")
233
234
    try:
235
        mailhost = api.get_tool("MailHost")
236
        mailhost.send(email, immediate=immediate)
237
    except SMTPException as e:
238
        logger.error(e)
239
        return False
240
    except socket.error as e:
241
        logger.error(e)
242
        return False
243
    return True
244