MailboxCleanerMessage.convert_filename()   A
last analyzed

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 14
nop 1
dl 0
loc 18
rs 9.2333
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
import hashlib
15
import logging
16
import os.path
17
import re
18
import shutil
19
import tempfile
20
import time
21
import unicodedata
22
import src.emlx2eml
23
24
# pylint: disable=R0801
25
__author__ = "Alexander Willner"
26
__copyright__ = "Copyright 2020, Alexander Willner"
27
__credits__ = [
28
    "github.com/guido4000",
29
    "github.com/halteproblem",
30
    "github.com/jamesridgway",
31
]
32
__license__ = "MIT"
33
__version__ = "1.0.4"
34
__maintainer__ = "Alexander Willner"
35
__email__ = "[email protected]"
36
__status__ = "Development"
37
38
39
class MailboxCleanerMessage:
40
    """
41
    Class to represent an e-mail.
42
    """
43
44
    _PLACEHOLDER = """
45
===========================================================
46
This message contained an attachment that was stripped out.
47
The attachment was stored using the file name: "%(newfile)s".
48
The original file name was: "%(filename)s".
49
The original size was: %(size)d KB.
50
The original type was: %(type)s.
51
Tool: https://mailboxcleanup.netcee.de
52
===========================================================
53
"""
54
55
    def __init__(self, args):
56
        self.args = args
57
58
    def download_and_detach_attachments(self, msg):
59
        """Download attachments and remove them from the mail."""
60
61
        modified = False
62
63
        # Iterate over each part of the email
64
        for part in msg.walk():
65
            if self.is_non_detachable_part(part):
66
                continue
67
            # Only download in relevant mode
68
            date = time.mktime(email.utils.parsedate(msg.get("date")))
69
            target = self.download_attachment(part, date)
70
            if target is not None:
71
                # Only detach in relevant mode
72
                if not self.args.detach:
73
                    logging.debug("      Detaching\t: skipped (disabled)")
74
                    continue
75
                self.detach_attachment(part, target)
76
                modified = True
77
78
        return modified
79
80
    def is_non_detachable_part(self, part):
81
        """Only process certain types and sizes of attachments."""
82
        # issue: next line might throw a LookupError
83
        # example:
84
        #  File ".../lib/python3.9/email/message.py", line 315, in set_payload
85
        # LookupError: unknown encoding: windows-1251
86
        msg_size = len(str(part)) / 1024
87
        logging.debug(
88
            "    Part\t: %d KB / %d KB (type: %s)",
89
            msg_size,
90
            self.args.min_size,
91
            part.get_content_maintype(),
92
        )
93
94
        non_detachable = (
95
            part.get_content_maintype() == "multipart"
96
            or part.get("Content-Disposition") is None
97
            or msg_size <= self.args.min_size
98
        )
99
        logging.debug("    Non-Det.\t: %s", non_detachable)
100
101
        return non_detachable
102
103
    def download_attachment(self, part, date) -> str:
104
        """Download the attachment from a part of an email."""
105
106
        if self.args.skip_download:
107
            logging.info("      Downl.\t: skipped (disabled)")
108
            return ""
109
110
        file_attached = self.convert_filename(part.get_filename())
111
112
        if file_attached == "unknown":
113
            logging.warning(
114
                "Warning\t: Unknown attachment " "(skipping this attachment)"
115
            )
116
            return None
117
118
        if not os.path.exists(self.args.target):
119
            os.mkdir(self.args.target)
120
        with tempfile.NamedTemporaryFile() as file_temp:
121
            logging.info(
122
                '      Downl.\t: "%s" (%s)',
123
                file_attached, part.get_content_maintype(
124
                )
125
            )
126
            logging.debug('      Downl.\t: To "%s"', file_temp.name)
127
            payload = part.get_payload(decode=True)
128
            if payload is None:
129
                return None
130
            file_temp.write(payload)
131
            file_temp.flush()
132
            target = self._copy_file(file_temp.name, file_attached, date)
133
134
        return target
135
136
    def _copy_file(self, source, target_name, date, iterator=0) -> str:
137
        """Copy file, check for duplicates via hash value."""
138
139
        target_base, target_extension = os.path.splitext(target_name)
140
        if iterator > 0:
141
            target_base = target_base + "-" + str(iterator)
142
        target = os.path.join(self.args.target, target_base + target_extension)
143
        if iterator == 0:
144
            logging.debug('      Moving\t: From "%s" to "%s".', source, target)
145
146
        if not os.path.isfile(target):
147
            shutil.copy2(source, target)
148
            os.utime(target, (date, date))
149
        else:
150
            source_hash = MailboxCleanerMessage.get_hash(source)
151
            target_hash = MailboxCleanerMessage.get_hash(target)
152
            if source_hash != target_hash:
153
                if iterator == 0:
154
                    logging.debug(
155
                        "      Conflict\t: Resolving same file / other hash.."
156
                    )
157
                target = self._copy_file(
158
                    source, target_name, date, iterator + 1)
159
            else:
160
                logging.debug("      Moving\t: Already exists (same hash)")
161
162
        return target
163
164
    def process_directory(self, handler, folder=None, cache=None):
165
        """Upload messages from a local directory."""
166
167
        source = self.args.upload if folder is None else folder
168
        if os.path.isfile(source):
169
            filenames = [os.path.dirname(source)]
170
            source = os.path.basename(source)
171
        else:
172
            filenames = os.listdir(source)
173
174
        for i, filename in enumerate(filenames, start=1):
175
            if os.path.isfile(source):
176
                filename = source
177
            else:
178
                filename = os.path.join(source, filename)
179
180
            # Recursive walker
181
            if os.path.isdir(filename):
182
                self.process_directory(handler, filename, cache)
183
184
            # Only take eml files into account
185
            if not filename.lower().endswith(".eml") and \
186
               not filename.lower().endswith(
187
                ".emlx"
188
            ):
189
                continue
190
191
            logging.warning("Files\t\t: %d / %d", i, len(filenames))
192
193
            with open(
194
                filename, encoding="utf8", errors="surrogateescape"
195
            ) as filepointer:
196
                # Specific handling of emlx files
197
                if filename.lower().endswith(".emlx"):
198
                    msg = src.emlx2eml.parse_emlx(filename)
199
                else:
200
                    msg = email.message_from_file(filepointer)
201
202
                # Logging
203
                msg_subject = self.get_subject(msg)
204
                msg_uid = self.get_uid(msg)
205
                logging.warning(
206
                    "    File\t: %s (%s: %s)", filename, msg_uid, msg_subject
207
                )
208
                if cache is not None and msg_uid in cache:
209
                    logging.warning("    Cache\t: OK")
210
                    continue
211
212
                logging.warning("    Cache\t: MISS")
213
214
                try:
215
                    # Remove attachments
216
                    self.download_and_detach_attachments(msg)
217
218
                    # Post process message (e.g. upload or save it)
219
                    handler(msg, self.args.folder)
220
                except (KeyError, UnicodeEncodeError) as error:
221
                    logging.debug("      Error\t: %s (in %s)", error, filename)
222
223
    @staticmethod
224
    def detach_attachment(msg, target):
225
        """Replace large attachment with dummy text."""
226
227
        # Get message details
228
        msg_content = msg.get_content_type()
229
        msg_filename = MailboxCleanerMessage.convert_filename(
230
            msg.get_filename())
231
        msg_size = len(str(msg)) / 1024
232
        msg_type = msg.get_content_disposition()
233
234
        logging.debug("      Detaching\t: %s (saved as %s)",
235
                      msg_filename, target)
236
237
        # Remove some old headers
238
        del msg["Content-Transfer-Encoding"]
239
        del msg["Content-Disposition"]
240
        del msg["Content-Description"]
241
        for k, _v in msg.get_params()[1:]:
242
            msg.del_param(k)
243
244
        # Make sure different clients visualize the removed content properly
245
        msg.set_type("text/plain")
246
        msg.set_payload("")
247
        msg.set_charset("utf-8")
248
        if msg_type == "attachment":
249
            msg.add_header("Content-Disposition", "inline")
250
        else:
251
            msg.add_header(
252
                "Content-Disposition",
253
                "attachment",
254
                filename="removed-%s.txt" % msg_filename,
255
            )
256
            msg.add_header("Content-Description",
257
                           "removed-%s.txt" % msg_filename)
258
259
        # Replace content
260
        msg_details = dict(
261
            newfile=os.path.basename(target),
262
            type=msg_content,
263
            filename=msg_filename,
264
            size=msg_size,
265
        )
266
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
267
        msg_placeholder = email.mime.text.MIMEText(
268
            msg_placeholder, "text", "utf-8")
269
        msg.set_payload(msg_placeholder.get_payload())
270
271
    @staticmethod
272
    def get_uid(message) -> str:
273
        """Get UID of message."""
274
275
        uid = MailboxCleanerMessage.get_header(message, "message-id")
276
        uid = email.utils.parseaddr(uid)[1]
277
278
        return uid
279
280
    @staticmethod
281
    def get_header(message, header: str) -> str:
282
        """Get a header field."""
283
284
        if header in message:
285
            item = message[header]
286
        else:
287
            item = ""
288
        item, encoding = email.header.decode_header(item)[0]
289
        encoding = "utf-8" if encoding is None else encoding
290
        try:
291
            item = (
292
                item.decode(encoding, errors="replace")
293
                if hasattr(item, "decode")
294
                else item
295
            )
296
        except LookupError as error:
297
            logging.debug(
298
                "      Error\t: decoding (%s) with (%s): %s",
299
                item, encoding, error
300
            )
301
            item = item.decode("ascii", "replace")
302
303
        return item
304
305
    @staticmethod
306
    def get_subject(message) -> str:
307
        """Get shortened message subject for visualization."""
308
309
        subject = MailboxCleanerMessage.get_header(message, "subject")
310
        subject = subject[:75] + (subject[75:] and "...")
311
        subject = subject.replace("\r\n", "")
312
        subject = subject.replace("\t", " ")
313
314
        return subject
315
316
    @staticmethod
317
    def get_hash(filename: str) -> str:
318
        """Get hash from filename to detect duplicates."""
319
320
        hash_value = hashlib.sha256()
321
        with open(filename, "rb") as file:
322
            for byte_block in iter(lambda: file.read(4096), b""):
323
                hash_value.update(byte_block)
324
        return hash_value.hexdigest()
325
326
    @staticmethod
327
    def slugify_filename(value):
328
        """Make sure attachments contain only valid characters."""
329
330
        value = str(value)
331
        value = unicodedata.normalize("NFKC", value)
332
        value = re.sub(r"[^.\w\s-]", "_", value)
333
        return value
334
335
    @staticmethod
336
    def convert_filename(file_struct) -> str:
337
        """Decode the name of some attachments."""
338
339
        filename = "unknown"
340
        if file_struct is not None:
341
            filename = ""
342
            file_decode = email.header.decode_header(file_struct)
343
            for file_part in file_decode:
344
                if file_part[1] is not None:
345
                    filename += file_part[0].decode(file_part[1])
346
                elif isinstance(file_part[0], str):
347
                    filename += file_part[0]
348
                else:
349
                    filename += file_part[0].decode('utf8')
350
        filename = filename.replace("\r", " ").replace("\n", " ")
351
352
        return MailboxCleanerMessage.slugify_filename(filename)
353