Passed
Push — main ( 988563...f46350 )
by Alexander
01:32
created

MailboxCleanerMessage.download_and_detach_attachments()   A

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 2
dl 0
loc 15
rs 9.9
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
from email.parser import HeaderParser
15
import hashlib
16
import logging
17
import os.path
18
import re
19
import shutil
20
import tempfile
21
import unicodedata
22
23
24
# pylint: disable=R0801
25
__author__ = "Alexander Willner"
26
__copyright__ = "Copyright 2020, Alexander Willner"
27
__credits__ = ["github.com/guido4000",
28
               "github.com/halteproblem", "github.com/jamesridgway"]
29
__license__ = "MIT"
30
__version__ = "1.0.0"
31
__maintainer__ = "Alexander Willner"
32
__email__ = "[email protected]"
33
__status__ = "Development"
34
35
36
class MailboxCleanerMessage():
37
    """
38
    Class to represent an e-mail.
39
    """
40
41
    _PLACEHOLDER = """
42
===========================================================
43
This message contained an attachment that was stripped out.
44
The filename was: "%(filename)s".
45
The size was: %(size)d KB.
46
The type was: %(type)s.
47
Tool: https://github.com/AlexanderWillner/MailboxCleanup
48
===========================================================
49
"""
50
51
    def __init__(self, args):
52
        self.args = args
53
54
    def download_and_detach_attachments(self, msg):
55
        """Download attachments and remove them from the mail."""
56
57
        modified = False
58
59
        # Iterate over each part of the email
60
        for part in msg.walk():
61
            if self.is_non_detachable_part(part):
62
                continue
63
            success = self.download_attachment(part)
64
            if success:
65
                self.detach_attachment(part)
66
                modified = True
67
68
        return modified
69
70
    def is_non_detachable_part(self, part):
71
        """Only process certain types and sizes of attachments."""
72
73
        msg_size = len(str(part)) / 1024
74
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
75
                      msg_size, self.args.max_size,
76
                      part.get_content_maintype())
77
78
        return part.get_content_maintype() == 'multipart' or \
79
            part.get('Content-Disposition') is None or \
80
            msg_size <= self.args.max_size
81
82
    def download_attachment(self, part) -> bool:
83
        """Download the attachment from a part of an email."""
84
85
        if self.args.skip_download:
86
            logging.info('    Downloading\t: skipped (disabled)')
87
            return True
88
89
        file_attached = self.convert_filename(part.get_filename())
90
91
        if file_attached == "unknown":
92
            logging.warning('Warning\t: Unknown attachment '
93
                            '(skipping this attachment)')
94
            return False
95
96
        if not os.path.exists(self.args.target):
97
            os.mkdir(self.args.target)
98
        with tempfile.NamedTemporaryFile() as file_temp:
99
            logging.info('    Downloading\t: "%s" (%s)',
100
                         file_attached, part.get_content_maintype())
101
            logging.debug('    Downloading\t: To "%s"', file_temp.name)
102
            payload = part.get_payload(decode=True)
103
            file_temp.write(payload)
104
            self._copy_file(file_temp.name, file_attached)
105
106
        return True
107
108
    def _copy_file(self, source, target_name, iterator=0):
109
        """Copy file, check for duplicates via hash value."""
110
111
        target_base, target_extension = os.path.splitext(target_name)
112
        if iterator > 0:
113
            target_base = target_base + "-" + str(iterator)
114
        target = os.path.join(self.args.target, target_base + target_extension)
115
        if iterator == 0:
116
            logging.debug('    Moving\t: From "%s" to "%s".', source, target)
117
118
        if not os.path.isfile(target):
119
            shutil.copy2(source, target)
120
        else:
121
            source_hash = MailboxCleanerMessage.get_hash(source)
122
            target_hash = MailboxCleanerMessage.get_hash(target)
123
            if source_hash != target_hash:
124
                if iterator == 0:
125
                    logging.debug(
126
                        '    Conflict\t: Resolving same file / other hash...')
127
                self._copy_file(source, target_name, iterator + 1)
128
            else:
129
                logging.debug('    Moving\t: Already exists (same hash)')
130
131
    def process_directory(self, handler):
132
        """Upload messages from a local directory."""
133
134
        directory = self.args.upload
135
        filenames = os.listdir(directory)
136
137
        for i, filename in enumerate(filenames, start=1):
138
            logging.warning('Progress\t: %d / %d', i, len(filenames))
139
            if not filename.lower().endswith(".eml") and\
140
               not filename.lower().endswith(".emlx"):
141
                continue
142
143
            filename = os.path.join(directory, filename)
144
            with open(filename) as filepointer:
145
                if filename.lower().endswith(".emlx"):
146
                    next(filepointer)
147
                msg = email.message_from_file(filepointer)
148
                msg_subject = self.get_subject(msg)
149
                logging.warning('    File\t: %s (%s)', filename, msg_subject)
150
151
                # Remove attachments
152
                self.download_and_detach_attachments(msg)
153
154
                # Post process message (e.g. upload or save it)
155
                handler(msg)
156
157
    @staticmethod
158
    def detach_attachment(msg):
159
        """Replace large attachment with dummy text."""
160
161
        # Get message details
162
        msg_content = msg.get_content_type()
163
        msg_filename = MailboxCleanerMessage.convert_filename(
164
            msg.get_filename())
165
        msg_size = len(str(msg)) / 1024
166
        msg_type = msg.get_content_disposition()
167
168
        # Remove some old headers
169
        del msg['Content-Transfer-Encoding']
170
        del msg['Content-Disposition']
171
        del msg['Content-Description']
172
        for k, _v in msg.get_params()[1:]:
173
            msg.del_param(k)
174
175
        # Make sure different clients visualize the removed content properly
176
        msg.set_type('text/plain')
177
        msg.set_charset('utf-8')
178
        if msg_type == 'attachment':
179
            msg.add_header('Content-Disposition', 'inline')
180
        else:
181
            msg.add_header('Content-Disposition', 'attachment',
182
                           filename='removed-%s.txt' % msg_filename)
183
            msg.add_header('Content-Description',
184
                           'removed-%s.txt' % msg_filename)
185
186
        # Replace content
187
        msg_details = dict(type=msg_content,
188
                           filename=msg_filename,
189
                           size=msg_size)
190
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
191
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
192
                                                   'text', 'utf-8')
193
        msg.set_payload(msg_placeholder.get_payload())
194
195
    @staticmethod
196
    def get_uid(message) -> str:
197
        """Get UID of message."""
198
199
        parser = HeaderParser()
200
        header = parser.parsestr(message.as_string())
201
        uid = email.utils.parseaddr(header['message-id'])
202
        return uid[1]
203
204
    @staticmethod
205
    def get_subject(message) -> str:
206
        """Get shortened message subject for visualization."""
207
208
        if 'subject' in message:
209
            subject = message['subject']
210
        else:
211
            subject = "unknown"  # very rarely messages have no subject
212
        subject, encoding = email.header.decode_header(subject)[0]
213
        encoding = 'utf-8' if encoding is None else encoding
214
        subject = subject.decode(encoding, errors='replace')\
215
            if hasattr(subject, 'decode') else subject
216
        subject = subject[:75] + (subject[75:] and '...')
217
        subject = subject.replace('\r\n', '')
218
        subject = subject.replace('\t', ' ')
219
220
        return subject
221
222
    @staticmethod
223
    def get_hash(filename: str) -> str:
224
        """Get hash from filename to detect duplicates."""
225
226
        hash_value = hashlib.sha256()
227
        with open(filename, "rb") as file:
228
            for byte_block in iter(lambda: file.read(4096), b""):
229
                hash_value.update(byte_block)
230
        return hash_value.hexdigest()
231
232
    @staticmethod
233
    def slugify_filename(value):
234
        """Make sure attachments contain only valid characters."""
235
236
        value = str(value)
237
        value = unicodedata.normalize('NFKC', value)
238
        value = re.sub(r'[^.\w\s-]', '_', value)
239
        return value
240
241
    @staticmethod
242
    def convert_filename(file_struct) -> str:
243
        """Decode the name of some attachments."""
244
245
        filename = 'unknown'
246
        if file_struct is not None:
247
            file_struct = email.header.decode_header(file_struct)[0]
248
            encoding = file_struct[1]
249
            if encoding is not None:
250
                filename = file_struct[0].decode(encoding)
251
            else:
252
                filename = file_struct[0]
253
254
        return MailboxCleanerMessage.slugify_filename(filename)
255