Passed
Push — main ( 4b8065...a02c75 )
by Alexander
01:39
created

MailboxCleanerMessage.download_attachment()   B

Complexity

Conditions 5

Size

Total Lines 26
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 19
nop 3
dl 0
loc 26
rs 8.9833
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
from email.parser import HeaderParser
15
import hashlib
16
import logging
17
import os.path
18
import re
19
import shutil
20
import tempfile
21
import time
22
import unicodedata
23
import src.emlx2eml
24
25
# pylint: disable=R0801
26
__author__ = "Alexander Willner"
27
__copyright__ = "Copyright 2020, Alexander Willner"
28
__credits__ = ["github.com/guido4000",
29
               "github.com/halteproblem", "github.com/jamesridgway"]
30
__license__ = "MIT"
31
__version__ = "1.0.0"
32
__maintainer__ = "Alexander Willner"
33
__email__ = "[email protected]"
34
__status__ = "Development"
35
36
37
class MailboxCleanerMessage():
38
    """
39
    Class to represent an e-mail.
40
    """
41
42
    _PLACEHOLDER = """
43
===========================================================
44
This message contained an attachment that was stripped out.
45
The attachment was stored using the file name: "%(newfile)s".
46
The original file name was: "%(filename)s".
47
The original size was: %(size)d KB.
48
The original type was: %(type)s.
49
Tool: https://mailboxcleanup.netcee.de
50
===========================================================
51
"""
52
53
    def __init__(self, args):
54
        self.args = args
55
56
    def download_and_detach_attachments(self, msg):
57
        """Download attachments and remove them from the mail."""
58
59
        modified = False
60
61
        # Iterate over each part of the email
62
        for part in msg.walk():
63
            if self.is_non_detachable_part(part):
64
                continue
65
            # Only download in relevant mode
66
            date = time.mktime(email.utils.parsedate(msg.get('date')))
67
            target = self.download_attachment(part, date)
68
            if target is not None:
69
                # Only detach in relevant mode
70
                if not self.args.detach:
71
                    logging.debug('      Detaching\t: skipped (disabled)')
72
                    continue
73
                self.detach_attachment(part, target)
74
                modified = True
75
76
        return modified
77
78
    def is_non_detachable_part(self, part):
79
        """Only process certain types and sizes of attachments."""
80
81
        msg_size = len(str(part)) / 1024
82
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
83
                      msg_size, self.args.min_size,
84
                      part.get_content_maintype())
85
86
        return part.get_content_maintype() == 'multipart' or \
87
            part.get('Content-Disposition') is None or \
88
            msg_size <= self.args.min_size
89
90
    def download_attachment(self, part, date) -> str:
91
        """Download the attachment from a part of an email."""
92
93
        if self.args.skip_download:
94
            logging.info('      Downl.\t: skipped (disabled)')
95
            return ""
96
97
        file_attached = self.convert_filename(part.get_filename())
98
99
        if file_attached == "unknown":
100
            logging.warning('Warning\t: Unknown attachment '
101
                            '(skipping this attachment)')
102
            return None
103
104
        if not os.path.exists(self.args.target):
105
            os.mkdir(self.args.target)
106
        with tempfile.NamedTemporaryFile() as file_temp:
107
            logging.info('      Downl.\t: "%s" (%s)',
108
                         file_attached, part.get_content_maintype())
109
            logging.debug('      Downl.\t: To "%s"', file_temp.name)
110
            payload = part.get_payload(decode=True)
111
            file_temp.write(payload)
112
            file_temp.flush()
113
            target = self._copy_file(file_temp.name, file_attached, date)
114
115
        return target
116
117
    def _copy_file(self, source, target_name, date, iterator=0) -> str:
118
        """Copy file, check for duplicates via hash value."""
119
120
        target_base, target_extension = os.path.splitext(target_name)
121
        if iterator > 0:
122
            target_base = target_base + "-" + str(iterator)
123
        target = os.path.join(self.args.target, target_base + target_extension)
124
        if iterator == 0:
125
            logging.debug('      Moving\t: From "%s" to "%s".', source, target)
126
127
        if not os.path.isfile(target):
128
            shutil.copy2(source, target)
129
            os.utime(target, (date, date))
130
        else:
131
            source_hash = MailboxCleanerMessage.get_hash(source)
132
            target_hash = MailboxCleanerMessage.get_hash(target)
133
            if source_hash != target_hash:
134
                if iterator == 0:
135
                    logging.debug(
136
                        '      Conflict\t: Resolving same file / other hash..')
137
                self._copy_file(source, target_name, date, iterator + 1)
138
            else:
139
                logging.debug('      Moving\t: Already exists (same hash)')
140
141
        return target
142
143
    def process_directory(self, handler, folder=None):
144
        """Upload messages from a local directory."""
145
146
        source = self.args.upload if folder is None else folder
147
        if os.path.isfile(source):
148
            filenames = [os.path.dirname(source)]
149
            source = os.path.basename(source)
150
        else:
151
            filenames = os.listdir(source)
152
153
        for i, filename in enumerate(filenames, start=1):
154
            if os.path.isfile(source):
155
                filename = source
156
            else:
157
                filename = os.path.join(source, filename)
158
159
            # Recursive walker
160
            if os.path.isdir(filename):
161
                self.process_directory(handler, filename)
162
163
            # Only take eml files into account
164
            if not filename.lower().endswith(".eml") and\
165
               not filename.lower().endswith(".emlx"):
166
                continue
167
168
            logging.warning('Files\t\t: %d / %d', i, len(filenames))
169
170
            with open(filename) as filepointer:
171
                # Specific handling of emlx files
172
                if filename.lower().endswith(".emlx"):
173
                    msg = src.emlx2eml.parse_emlx(filename)
174
                else:
175
                    msg = email.message_from_file(filepointer)
176
177
                # Logging
178
                msg_subject = self.get_subject(msg)
179
                logging.warning('    File\t: %s (%s)', filename, msg_subject)
180
181
                # Remove attachments
182
                self.download_and_detach_attachments(msg)
183
184
                # Post process message (e.g. upload or save it)
185
                handler(msg)
186
187
    @staticmethod
188
    def detach_attachment(msg, target):
189
        """Replace large attachment with dummy text."""
190
191
        # Get message details
192
        msg_content = msg.get_content_type()
193
        msg_filename = MailboxCleanerMessage.convert_filename(
194
            msg.get_filename())
195
        msg_size = len(str(msg)) / 1024
196
        msg_type = msg.get_content_disposition()
197
198
        logging.debug('      Detaching\t: %s', msg_filename)
199
200
        # Remove some old headers
201
        del msg['Content-Transfer-Encoding']
202
        del msg['Content-Disposition']
203
        del msg['Content-Description']
204
        for k, _v in msg.get_params()[1:]:
205
            msg.del_param(k)
206
207
        # Make sure different clients visualize the removed content properly
208
        msg.set_type('text/plain')
209
        msg.set_charset('utf-8')
210
        if msg_type == 'attachment':
211
            msg.add_header('Content-Disposition', 'inline')
212
        else:
213
            msg.add_header('Content-Disposition', 'attachment',
214
                           filename='removed-%s.txt' % msg_filename)
215
            msg.add_header('Content-Description',
216
                           'removed-%s.txt' % msg_filename)
217
218
        # Replace content
219
        msg_details = dict(newfile=os.path.basename(target),
220
                           type=msg_content,
221
                           filename=msg_filename,
222
                           size=msg_size)
223
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
224
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
225
                                                   'text', 'utf-8')
226
        msg.set_payload(msg_placeholder.get_payload())
227
228
    @staticmethod
229
    def get_uid(message) -> str:
230
        """Get UID of message."""
231
232
        parser = HeaderParser()
233
        header = parser.parsestr(message.as_string())
234
        uid = email.utils.parseaddr(header['message-id'])
235
        return uid[1]
236
237
    @staticmethod
238
    def get_subject(message) -> str:
239
        """Get shortened message subject for visualization."""
240
241
        if 'subject' in message:
242
            subject = message['subject']
243
        else:
244
            subject = "unknown"  # very rarely messages have no subject
245
        subject, encoding = email.header.decode_header(subject)[0]
246
        encoding = 'utf-8' if encoding is None else encoding
247
        subject = subject.decode(encoding, errors='replace')\
248
            if hasattr(subject, 'decode') else subject
249
        subject = subject[:75] + (subject[75:] and '...')
250
        subject = subject.replace('\r\n', '')
251
        subject = subject.replace('\t', ' ')
252
253
        return subject
254
255
    @staticmethod
256
    def get_hash(filename: str) -> str:
257
        """Get hash from filename to detect duplicates."""
258
259
        hash_value = hashlib.sha256()
260
        with open(filename, "rb") as file:
261
            for byte_block in iter(lambda: file.read(4096), b""):
262
                hash_value.update(byte_block)
263
        return hash_value.hexdigest()
264
265
    @staticmethod
266
    def slugify_filename(value):
267
        """Make sure attachments contain only valid characters."""
268
269
        value = str(value)
270
        value = unicodedata.normalize('NFKC', value)
271
        value = re.sub(r'[^.\w\s-]', '_', value)
272
        return value
273
274
    @staticmethod
275
    def convert_filename(file_struct) -> str:
276
        """Decode the name of some attachments."""
277
278
        filename = 'unknown'
279
        if file_struct is not None:
280
            file_struct = email.header.decode_header(file_struct)[0]
281
            encoding = file_struct[1]
282
            if encoding is not None:
283
                filename = file_struct[0].decode(encoding)
284
            else:
285
                filename = file_struct[0]
286
        filename = filename.replace("\r", "").replace("\n", "")
287
288
        return MailboxCleanerMessage.slugify_filename(filename)
289