Completed
Push — main ( d0c132...3e91f8 )
by Alexander
01:56
created

src.mailbox_message   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 319
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 209
dl 0
loc 319
rs 8.4
c 0
b 0
f 0
wmc 50

13 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerMessage.__init__() 0 2 1
A MailboxCleanerMessage.slugify_filename() 0 8 1
A MailboxCleanerMessage.get_header() 0 19 5
B MailboxCleanerMessage.download_attachment() 0 28 6
A MailboxCleanerMessage.convert_filename() 0 15 3
A MailboxCleanerMessage.get_uid() 0 8 1
B MailboxCleanerMessage._copy_file() 0 26 6
A MailboxCleanerMessage.get_hash() 0 9 4
A MailboxCleanerMessage.download_and_detach_attachments() 0 21 5
A MailboxCleanerMessage.detach_attachment() 0 41 3
A MailboxCleanerMessage.get_subject() 0 10 1
A MailboxCleanerMessage.is_non_detachable_part() 0 13 1
D MailboxCleanerMessage.process_directory() 0 55 13

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_message often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
import hashlib
15
import logging
16
import os.path
17
import re
18
import shutil
19
import tempfile
20
import time
21
import unicodedata
22
import src.emlx2eml
23
24
# pylint: disable=R0801
25
__author__ = "Alexander Willner"
26
__copyright__ = "Copyright 2020, Alexander Willner"
27
__credits__ = ["github.com/guido4000",
28
               "github.com/halteproblem", "github.com/jamesridgway"]
29
__license__ = "MIT"
30
__version__ = "1.0.4"
31
__maintainer__ = "Alexander Willner"
32
__email__ = "[email protected]"
33
__status__ = "Development"
34
35
36
class MailboxCleanerMessage():
37
    """
38
    Class to represent an e-mail.
39
    """
40
41
    _PLACEHOLDER = """
42
===========================================================
43
This message contained an attachment that was stripped out.
44
The attachment was stored using the file name: "%(newfile)s".
45
The original file name was: "%(filename)s".
46
The original size was: %(size)d KB.
47
The original type was: %(type)s.
48
Tool: https://mailboxcleanup.netcee.de
49
===========================================================
50
"""
51
52
    def __init__(self, args):
53
        self.args = args
54
55
    def download_and_detach_attachments(self, msg):
56
        """Download attachments and remove them from the mail."""
57
58
        modified = False
59
60
        # Iterate over each part of the email
61
        for part in msg.walk():
62
            if self.is_non_detachable_part(part):
63
                continue
64
            # Only download in relevant mode
65
            date = time.mktime(email.utils.parsedate(msg.get('date')))
66
            target = self.download_attachment(part, date)
67
            if target is not None:
68
                # Only detach in relevant mode
69
                if not self.args.detach:
70
                    logging.debug('      Detaching\t: skipped (disabled)')
71
                    continue
72
                self.detach_attachment(part, target)
73
                modified = True
74
75
        return modified
76
77
    def is_non_detachable_part(self, part):
78
        """Only process certain types and sizes of attachments."""
79
        msg_size = len(str(part)) / 1024
80
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
81
                      msg_size, self.args.min_size,
82
                      part.get_content_maintype())
83
84
        non_detachable = part.get_content_maintype() == 'multipart' or \
85
            part.get('Content-Disposition') is None or \
86
            msg_size <= self.args.min_size
87
        logging.debug('    Non-Det.\t: %s', non_detachable)
88
89
        return non_detachable
90
91
    def download_attachment(self, part, date) -> str:
92
        """Download the attachment from a part of an email."""
93
94
        if self.args.skip_download:
95
            logging.info('      Downl.\t: skipped (disabled)')
96
            return ""
97
98
        file_attached = self.convert_filename(part.get_filename())
99
100
        if file_attached == "unknown":
101
            logging.warning('Warning\t: Unknown attachment '
102
                            '(skipping this attachment)')
103
            return None
104
105
        if not os.path.exists(self.args.target):
106
            os.mkdir(self.args.target)
107
        with tempfile.NamedTemporaryFile() as file_temp:
108
            logging.info('      Downl.\t: "%s" (%s)',
109
                         file_attached, part.get_content_maintype())
110
            logging.debug('      Downl.\t: To "%s"', file_temp.name)
111
            payload = part.get_payload(decode=True)
112
            if payload is None:
113
                return None
114
            file_temp.write(payload)
115
            file_temp.flush()
116
            target = self._copy_file(file_temp.name, file_attached, date)
117
118
        return target
119
120
    def _copy_file(self, source, target_name, date, iterator=0) -> str:
121
        """Copy file, check for duplicates via hash value."""
122
123
        target_base, target_extension = os.path.splitext(target_name)
124
        if iterator > 0:
125
            target_base = target_base + "-" + str(iterator)
126
        target = os.path.join(self.args.target, target_base + target_extension)
127
        if iterator == 0:
128
            logging.debug('      Moving\t: From "%s" to "%s".', source, target)
129
130
        if not os.path.isfile(target):
131
            shutil.copy2(source, target)
132
            os.utime(target, (date, date))
133
        else:
134
            source_hash = MailboxCleanerMessage.get_hash(source)
135
            target_hash = MailboxCleanerMessage.get_hash(target)
136
            if source_hash != target_hash:
137
                if iterator == 0:
138
                    logging.debug(
139
                        '      Conflict\t: Resolving same file / other hash..')
140
                target = self._copy_file(source, target_name, date,
141
                                         iterator + 1)
142
            else:
143
                logging.debug('      Moving\t: Already exists (same hash)')
144
145
        return target
146
147
    def process_directory(self, handler, folder=None, cache=None):
148
        """Upload messages from a local directory."""
149
150
        source = self.args.upload if folder is None else folder
151
        if os.path.isfile(source):
152
            filenames = [os.path.dirname(source)]
153
            source = os.path.basename(source)
154
        else:
155
            filenames = os.listdir(source)
156
157
        for i, filename in enumerate(filenames, start=1):
158
            if os.path.isfile(source):
159
                filename = source
160
            else:
161
                filename = os.path.join(source, filename)
162
163
            # Recursive walker
164
            if os.path.isdir(filename):
165
                self.process_directory(handler, filename, cache)
166
167
            # Only take eml files into account
168
            if not filename.lower().endswith(".eml") and\
169
               not filename.lower().endswith(".emlx"):
170
                continue
171
172
            logging.warning('Files\t\t: %d / %d', i, len(filenames))
173
174
            with open(filename,
175
                      encoding="utf8",
176
                      errors="surrogateescape") as filepointer:
177
                # Specific handling of emlx files
178
                if filename.lower().endswith(".emlx"):
179
                    msg = src.emlx2eml.parse_emlx(filename)
180
                else:
181
                    msg = email.message_from_file(filepointer)
182
183
                # Logging
184
                msg_subject = self.get_subject(msg)
185
                msg_uid = self.get_uid(msg)
186
                logging.warning('    File\t: %s (%s: %s)',
187
                                filename, msg_uid, msg_subject)
188
                if cache is not None and msg_uid in cache:
189
                    logging.warning('    Cache\t: OK')
190
                    continue
191
192
                logging.warning('    Cache\t: MISS')
193
194
                try:
195
                    # Remove attachments
196
                    self.download_and_detach_attachments(msg)
197
198
                    # Post process message (e.g. upload or save it)
199
                    handler(msg, self.args.folder)
200
                except (KeyError, UnicodeEncodeError) as error:
201
                    logging.debug('      Error\t: %s (in %s)', error, filename)
202
203
    @staticmethod
204
    def detach_attachment(msg, target):
205
        """Replace large attachment with dummy text."""
206
207
        # Get message details
208
        msg_content = msg.get_content_type()
209
        msg_filename = MailboxCleanerMessage.convert_filename(
210
            msg.get_filename())
211
        msg_size = len(str(msg)) / 1024
212
        msg_type = msg.get_content_disposition()
213
214
        logging.debug('      Detaching\t: %s (saved as %s)',
215
                      msg_filename, target)
216
217
        # Remove some old headers
218
        del msg['Content-Transfer-Encoding']
219
        del msg['Content-Disposition']
220
        del msg['Content-Description']
221
        for k, _v in msg.get_params()[1:]:
222
            msg.del_param(k)
223
224
        # Make sure different clients visualize the removed content properly
225
        msg.set_type('text/plain')
226
        msg.set_charset('utf-8')
227
        if msg_type == 'attachment':
228
            msg.add_header('Content-Disposition', 'inline')
229
        else:
230
            msg.add_header('Content-Disposition', 'attachment',
231
                           filename='removed-%s.txt' % msg_filename)
232
            msg.add_header('Content-Description',
233
                           'removed-%s.txt' % msg_filename)
234
235
        # Replace content
236
        msg_details = dict(newfile=os.path.basename(target),
237
                           type=msg_content,
238
                           filename=msg_filename,
239
                           size=msg_size)
240
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
241
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
242
                                                   'text', 'utf-8')
243
        msg.set_payload(msg_placeholder.get_payload())
244
245
    @staticmethod
246
    def get_uid(message) -> str:
247
        """Get UID of message."""
248
249
        uid = MailboxCleanerMessage.get_header(message, 'message-id')
250
        uid = email.utils.parseaddr(uid)[1]
251
252
        return uid
253
254
    @staticmethod
255
    def get_header(message, header: str) -> str:
256
        """Get a header field."""
257
258
        if header in message:
259
            item = message[header]
260
        else:
261
            item = ""
262
        item, encoding = email.header.decode_header(item)[0]
263
        encoding = 'utf-8' if encoding is None else encoding
264
        try:
265
            item = item.decode(encoding, errors='replace')\
266
                if hasattr(item, 'decode') else item
267
        except LookupError as error:
268
            logging.debug('      Error\t: decoding (%s) with (%s): %s',
269
                          item, encoding, error)
270
            item = item.decode('ascii', 'replace')
271
272
        return item
273
274
    @staticmethod
275
    def get_subject(message) -> str:
276
        """Get shortened message subject for visualization."""
277
278
        subject = MailboxCleanerMessage.get_header(message, 'subject')
279
        subject = subject[:75] + (subject[75:] and '...')
280
        subject = subject.replace('\r\n', '')
281
        subject = subject.replace('\t', ' ')
282
283
        return subject
284
285
    @staticmethod
286
    def get_hash(filename: str) -> str:
287
        """Get hash from filename to detect duplicates."""
288
289
        hash_value = hashlib.sha256()
290
        with open(filename, "rb") as file:
291
            for byte_block in iter(lambda: file.read(4096), b""):
292
                hash_value.update(byte_block)
293
        return hash_value.hexdigest()
294
295
    @staticmethod
296
    def slugify_filename(value):
297
        """Make sure attachments contain only valid characters."""
298
299
        value = str(value)
300
        value = unicodedata.normalize('NFKC', value)
301
        value = re.sub(r'[^.\w\s-]', '_', value)
302
        return value
303
304
    @staticmethod
305
    def convert_filename(file_struct) -> str:
306
        """Decode the name of some attachments."""
307
308
        filename = 'unknown'
309
        if file_struct is not None:
310
            file_struct = email.header.decode_header(file_struct)[0]
311
            encoding = file_struct[1]
312
            if encoding is not None:
313
                filename = file_struct[0].decode(encoding)
314
            else:
315
                filename = file_struct[0]
316
        filename = filename.replace("\r", "").replace("\n", "")
317
318
        return MailboxCleanerMessage.slugify_filename(filename)
319