Passed
Push — main ( 242f40...8b1e40 )
by Alexander
01:29
created

src.mailbox_message   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 172
dl 0
loc 264
rs 9.1199
c 0
b 0
f 0
wmc 41

12 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerMessage.slugify_filename() 0 8 1
A MailboxCleanerMessage.__init__() 0 2 1
B MailboxCleanerMessage.download_attachment() 0 34 7
A MailboxCleanerMessage.convert_filename() 0 14 3
A MailboxCleanerMessage.get_uid() 0 8 1
B MailboxCleanerMessage._copy_file() 0 22 6
A MailboxCleanerMessage.get_hash() 0 9 4
A MailboxCleanerMessage.download_and_detach_attachments() 0 15 4
A MailboxCleanerMessage.detach_attachment() 0 37 3
A MailboxCleanerMessage.get_subject() 0 17 4
A MailboxCleanerMessage.is_non_detachable_part() 0 11 1
B MailboxCleanerMessage.process_directory() 0 25 6

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_message often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
from email.parser import HeaderParser
15
import hashlib
16
import logging
17
import os.path
18
import re
19
import shutil
20
import tempfile
21
import unicodedata
22
23
24
# pylint: disable=R0801
25
__author__ = "Alexander Willner"
26
__copyright__ = "Copyright 2020, Alexander Willner"
27
__credits__ = ["github.com/guido4000",
28
               "github.com/halteproblem", "github.com/jamesridgway"]
29
__license__ = "MIT"
30
__version__ = "1.0.0"
31
__maintainer__ = "Alexander Willner"
32
__email__ = "[email protected]"
33
__status__ = "Development"
34
35
36
class MailboxCleanerMessage():
37
    """
38
    Class to represent an e-mail.
39
    """
40
41
    _PLACEHOLDER = """
42
===========================================================
43
This message contained an attachment that was stripped out.
44
The filename was: "%(filename)s".
45
The size was: %(size)d KB.
46
The type was: %(type)s.
47
Tool: https://github.com/AlexanderWillner/MailboxCleanup
48
===========================================================
49
"""
50
51
    def __init__(self, args):
52
        self.args = args
53
54
    def download_and_detach_attachments(self, msg):
55
        """Download attachments and remove them from the mail."""
56
57
        modified = False
58
59
        # Iterate over each part of the email
60
        for part in msg.walk():
61
            if self.is_non_detachable_part(part):
62
                continue
63
            success = self.download_attachment(part)
64
            if success:
65
                self.detach_attachment(part)
66
                modified = True
67
68
        return modified
69
70
    def is_non_detachable_part(self, part):
71
        """Only process certain types and sizes of attachments."""
72
73
        msg_size = len(str(part)) / 1024
74
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
75
                      msg_size, self.args.max_size,
76
                      part.get_content_maintype())
77
78
        return part.get_content_maintype() == 'multipart' or \
79
            part.get('Content-Disposition') is None or \
80
            msg_size <= self.args.max_size
81
82
    def download_attachment(self, part) -> bool:
83
        """Download the attachment from a part of an email."""
84
85
        if self.args.skip_download:
86
            logging.info('    Downloading\t: skipped (disabled)')
87
            return True
88
89
        if part.get_filename() is None:
90
            logging.warning('Warning\t: Could not download attachment '
91
                            '(skipping this attachment)')
92
            return False
93
94
        file_attached = self.convert_filename(part.get_filename())
95
96
        if file_attached == "unknown":
97
            logging.warning('Warning\t: Unknown attachment '
98
                            '(skipping this attachment)')
99
            return False
100
101
        if not os.path.exists(self.args.target):
102
            os.mkdir(self.args.target)
103
        with tempfile.NamedTemporaryFile() as file_temp:
104
            logging.info('    Downloading\t: "%s" (%s)',
105
                         file_attached, part.get_content_maintype())
106
            logging.debug('    Downloading\t: To "%s"', file_temp.name)
107
            payload = part.get_payload(decode=True)
108
            if payload is not None:
109
                file_temp.write(payload)
110
                self._copy_file(file_temp.name, file_attached)
111
            else:
112
                logging.warning('    Downloading\t: File "%s" was empty',
113
                                file_attached)
114
115
        return True
116
117
    def _copy_file(self, source, target_name, iterator=0):
118
        """Copy file, check for duplicates via hash value."""
119
120
        target_base, target_extension = os.path.splitext(target_name)
121
        if iterator > 0:
122
            target_base = target_base + "-" + str(iterator)
123
        target = os.path.join(self.args.target, target_base + target_extension)
124
        if iterator == 0:
125
            logging.debug('    Moving\t: From "%s" to "%s".', source, target)
126
127
        if not os.path.isfile(target):
128
            shutil.copy2(source, target)
129
        else:
130
            source_hash = MailboxCleanerMessage.get_hash(source)
131
            target_hash = MailboxCleanerMessage.get_hash(target)
132
            if source_hash != target_hash:
133
                if iterator == 0:
134
                    logging.debug(
135
                        '    Conflict\t: Resolving same file / other hash...')
136
                self._copy_file(source, target_name, iterator + 1)
137
            else:
138
                logging.debug('    Moving\t: Already exists (same hash)')
139
140
    def process_directory(self, handler):
141
        """Upload messages from a local directory."""
142
143
        directory = self.args.upload
144
        filenames = os.listdir(directory)
145
146
        for i, filename in enumerate(filenames, start=1):
147
            logging.warning('Progress\t: %d / %d', i, len(filenames))
148
            if not filename.lower().endswith(".eml") and\
149
               not filename.lower().endswith(".emlx"):
150
                continue
151
152
            filename = os.path.join(directory, filename)
153
            with open(filename) as filepointer:
154
                if filename.lower().endswith(".emlx"):
155
                    next(filepointer)
156
                msg = email.message_from_file(filepointer)
157
                msg_subject = self.get_subject(msg)
158
                logging.warning('    File\t: %s (%s)', filename, msg_subject)
159
160
                # Remove attachments
161
                self.download_and_detach_attachments(msg)
162
163
                # Post process message (e.g. upload or save it)
164
                handler(msg)
165
166
    @staticmethod
167
    def detach_attachment(msg):
168
        """Replace large attachment with dummy text."""
169
170
        # Get message details
171
        msg_content = msg.get_content_type()
172
        msg_filename = MailboxCleanerMessage.convert_filename(
173
            msg.get_filename())
174
        msg_size = len(str(msg)) / 1024
175
        msg_type = msg.get_content_disposition()
176
177
        # Remove some old headers
178
        del msg['Content-Transfer-Encoding']
179
        del msg['Content-Disposition']
180
        del msg['Content-Description']
181
        for k, _v in msg.get_params()[1:]:
182
            msg.del_param(k)
183
184
        # Make sure different clients visualize the removed content properly
185
        msg.set_type('text/plain')
186
        msg.set_charset('utf-8')
187
        if msg_type == 'attachment':
188
            msg.add_header('Content-Disposition', 'inline')
189
        else:
190
            msg.add_header('Content-Disposition', 'attachment',
191
                           filename='removed-%s.txt' % msg_filename)
192
            msg.add_header('Content-Description',
193
                           'removed-%s.txt' % msg_filename)
194
195
        # Replace content
196
        msg_details = dict(type=msg_content,
197
                           filename=msg_filename,
198
                           size=msg_size)
199
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
200
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
201
                                                   'text', 'utf-8')
202
        msg.set_payload(msg_placeholder.get_payload())
203
204
    @staticmethod
205
    def get_uid(message) -> str:
206
        """Get UID of message."""
207
208
        parser = HeaderParser()
209
        header = parser.parsestr(message.as_string())
210
        uid = email.utils.parseaddr(header['message-id'])
211
        return uid[1]
212
213
    @staticmethod
214
    def get_subject(message) -> str:
215
        """Get shortened message subject for visualization."""
216
217
        if 'subject' in message:
218
            subject = message['subject']
219
        else:
220
            subject = "unknown"  # very rarely messages have no subject
221
        subject, encoding = email.header.decode_header(subject)[0]
222
        encoding = 'utf-8' if encoding is None else encoding
223
        subject = subject.decode(encoding, errors='replace')\
224
            if hasattr(subject, 'decode') else subject
225
        subject = subject[:75] + (subject[75:] and '...')
226
        subject = subject.replace('\r\n', '')
227
        subject = subject.replace('\t', ' ')
228
229
        return subject
230
231
    @staticmethod
232
    def get_hash(filename: str) -> str:
233
        """Get hash from filename to detect duplicates."""
234
235
        hash_value = hashlib.sha256()
236
        with open(filename, "rb") as file:
237
            for byte_block in iter(lambda: file.read(4096), b""):
238
                hash_value.update(byte_block)
239
        return hash_value.hexdigest()
240
241
    @staticmethod
242
    def slugify_filename(value):
243
        """Make sure attachments contain only valid characters."""
244
245
        value = str(value)
246
        value = unicodedata.normalize('NFKC', value)
247
        value = re.sub(r'[^.\w\s-]', '_', value)
248
        return value
249
250
    @staticmethod
251
    def convert_filename(file_struct) -> str:
252
        """Decode the name of some attachments."""
253
254
        filename = 'unknown'
255
        if file_struct is not None:
256
            file_struct = email.header.decode_header(file_struct)[0]
257
            encoding = file_struct[1]
258
            if encoding is not None:
259
                filename = file_struct[0].decode(encoding)
260
            else:
261
                filename = file_struct[0]
262
263
        return MailboxCleanerMessage.slugify_filename(filename)
264