Passed
Push — main ( d4414f...bd5175 )
by Alexander
02:18
created

src.mailbox_message   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 269
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 171
dl 0
loc 269
rs 9.0399
c 0
b 0
f 0
wmc 42

12 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerMessage.__init__() 0 2 1
A MailboxCleanerMessage.download_and_detach_attachments() 0 15 4
A MailboxCleanerMessage.is_non_detachable_part() 0 11 1
A MailboxCleanerMessage.download_attachment() 0 25 5
B MailboxCleanerMessage._copy_file() 0 22 6
A MailboxCleanerMessage.slugify_filename() 0 8 1
A MailboxCleanerMessage.convert_filename() 0 14 3
A MailboxCleanerMessage.get_uid() 0 8 1
A MailboxCleanerMessage.get_hash() 0 9 4
A MailboxCleanerMessage.detach_attachment() 0 37 3
A MailboxCleanerMessage.get_subject() 0 17 4
C MailboxCleanerMessage.process_directory() 0 39 9

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_message often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
from email.parser import HeaderParser
15
import hashlib
16
import logging
17
import os.path
18
import re
19
import shutil
20
import tempfile
21
import unicodedata
22
import emlx
23
24
# pylint: disable=R0801
25
__author__ = "Alexander Willner"
26
__copyright__ = "Copyright 2020, Alexander Willner"
27
__credits__ = ["github.com/guido4000",
28
               "github.com/halteproblem", "github.com/jamesridgway"]
29
__license__ = "MIT"
30
__version__ = "1.0.0"
31
__maintainer__ = "Alexander Willner"
32
__email__ = "[email protected]"
33
__status__ = "Development"
34
35
36
class MailboxCleanerMessage():
37
    """
38
    Class to represent an e-mail.
39
    """
40
41
    _PLACEHOLDER = """
42
===========================================================
43
This message contained an attachment that was stripped out.
44
The filename was: "%(filename)s".
45
The size was: %(size)d KB.
46
The type was: %(type)s.
47
Tool: https://github.com/AlexanderWillner/MailboxCleanup
48
===========================================================
49
"""
50
51
    def __init__(self, args):
52
        self.args = args
53
54
    def download_and_detach_attachments(self, msg):
55
        """Download attachments and remove them from the mail."""
56
57
        modified = False
58
59
        # Iterate over each part of the email
60
        for part in msg.walk():
61
            if self.is_non_detachable_part(part):
62
                continue
63
            success = self.download_attachment(part)
64
            if success:
65
                self.detach_attachment(part)
66
                modified = True
67
68
        return modified
69
70
    def is_non_detachable_part(self, part):
71
        """Only process certain types and sizes of attachments."""
72
73
        msg_size = len(str(part)) / 1024
74
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
75
                      msg_size, self.args.max_size,
76
                      part.get_content_maintype())
77
78
        return part.get_content_maintype() == 'multipart' or \
79
            part.get('Content-Disposition') is None or \
80
            msg_size <= self.args.max_size
81
82
    def download_attachment(self, part) -> bool:
83
        """Download the attachment from a part of an email."""
84
85
        if self.args.skip_download:
86
            logging.info('    Downloading\t: skipped (disabled)')
87
            return True
88
89
        file_attached = self.convert_filename(part.get_filename())
90
91
        if file_attached == "unknown":
92
            logging.warning('Warning\t: Unknown attachment '
93
                            '(skipping this attachment)')
94
            return False
95
96
        if not os.path.exists(self.args.target):
97
            os.mkdir(self.args.target)
98
        with tempfile.NamedTemporaryFile() as file_temp:
99
            logging.info('    Downloading\t: "%s" (%s)',
100
                         file_attached, part.get_content_maintype())
101
            logging.debug('    Downloading\t: To "%s"', file_temp.name)
102
            payload = part.get_payload(decode=True)
103
            file_temp.write(payload)
104
            self._copy_file(file_temp.name, file_attached)
105
106
        return True
107
108
    def _copy_file(self, source, target_name, iterator=0):
109
        """Copy file, check for duplicates via hash value."""
110
111
        target_base, target_extension = os.path.splitext(target_name)
112
        if iterator > 0:
113
            target_base = target_base + "-" + str(iterator)
114
        target = os.path.join(self.args.target, target_base + target_extension)
115
        if iterator == 0:
116
            logging.debug('    Moving\t: From "%s" to "%s".', source, target)
117
118
        if not os.path.isfile(target):
119
            shutil.copy2(source, target)
120
        else:
121
            source_hash = MailboxCleanerMessage.get_hash(source)
122
            target_hash = MailboxCleanerMessage.get_hash(target)
123
            if source_hash != target_hash:
124
                if iterator == 0:
125
                    logging.debug(
126
                        '    Conflict\t: Resolving same file / other hash...')
127
                self._copy_file(source, target_name, iterator + 1)
128
            else:
129
                logging.debug('    Moving\t: Already exists (same hash)')
130
131
    def process_directory(self, handler, folder=None):
132
        """Upload messages from a local directory."""
133
134
        source = self.args.upload if folder is None else folder
135
        if os.path.isfile(source):
136
            filenames = [os.path.dirname(source)]
137
        else:
138
            filenames = os.listdir(source)
139
140
        for i, filename in enumerate(filenames, start=1):
141
            filename = os.path.join(source, filename)
142
143
            # Recursive walker
144
            if os.path.isdir(filename):
145
                self.process_directory(handler, filename)
146
147
            # Only take eml files into account
148
            if not filename.lower().endswith(".eml") and\
149
               not filename.lower().endswith(".emlx"):
150
                continue
151
152
            logging.warning('Files\t: %d / %d', i, len(filenames))
153
154
            with open(filename) as filepointer:
155
                # Specific handling of emlx files
156
                if filename.lower().endswith(".emlx"):
157
                    msg = emlx.read(filename)
158
                else:
159
                    msg = email.message_from_file(filepointer)
160
161
                # Logging
162
                msg_subject = self.get_subject(msg)
163
                logging.warning('    File\t: %s (%s)', filename, msg_subject)
164
165
                # Remove attachments
166
                self.download_and_detach_attachments(msg)
167
168
                # Post process message (e.g. upload or save it)
169
                handler(msg)
170
171
    @staticmethod
172
    def detach_attachment(msg):
173
        """Replace large attachment with dummy text."""
174
175
        # Get message details
176
        msg_content = msg.get_content_type()
177
        msg_filename = MailboxCleanerMessage.convert_filename(
178
            msg.get_filename())
179
        msg_size = len(str(msg)) / 1024
180
        msg_type = msg.get_content_disposition()
181
182
        # Remove some old headers
183
        del msg['Content-Transfer-Encoding']
184
        del msg['Content-Disposition']
185
        del msg['Content-Description']
186
        for k, _v in msg.get_params()[1:]:
187
            msg.del_param(k)
188
189
        # Make sure different clients visualize the removed content properly
190
        msg.set_type('text/plain')
191
        msg.set_charset('utf-8')
192
        if msg_type == 'attachment':
193
            msg.add_header('Content-Disposition', 'inline')
194
        else:
195
            msg.add_header('Content-Disposition', 'attachment',
196
                           filename='removed-%s.txt' % msg_filename)
197
            msg.add_header('Content-Description',
198
                           'removed-%s.txt' % msg_filename)
199
200
        # Replace content
201
        msg_details = dict(type=msg_content,
202
                           filename=msg_filename,
203
                           size=msg_size)
204
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
205
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
206
                                                   'text', 'utf-8')
207
        msg.set_payload(msg_placeholder.get_payload())
208
209
    @staticmethod
210
    def get_uid(message) -> str:
211
        """Get UID of message."""
212
213
        parser = HeaderParser()
214
        header = parser.parsestr(message.as_string())
215
        uid = email.utils.parseaddr(header['message-id'])
216
        return uid[1]
217
218
    @staticmethod
219
    def get_subject(message) -> str:
220
        """Get shortened message subject for visualization."""
221
222
        if 'subject' in message:
223
            subject = message['subject']
224
        else:
225
            subject = "unknown"  # very rarely messages have no subject
226
        subject, encoding = email.header.decode_header(subject)[0]
227
        encoding = 'utf-8' if encoding is None else encoding
228
        subject = subject.decode(encoding, errors='replace')\
229
            if hasattr(subject, 'decode') else subject
230
        subject = subject[:75] + (subject[75:] and '...')
231
        subject = subject.replace('\r\n', '')
232
        subject = subject.replace('\t', ' ')
233
234
        return subject
235
236
    @staticmethod
237
    def get_hash(filename: str) -> str:
238
        """Get hash from filename to detect duplicates."""
239
240
        hash_value = hashlib.sha256()
241
        with open(filename, "rb") as file:
242
            for byte_block in iter(lambda: file.read(4096), b""):
243
                hash_value.update(byte_block)
244
        return hash_value.hexdigest()
245
246
    @staticmethod
247
    def slugify_filename(value):
248
        """Make sure attachments contain only valid characters."""
249
250
        value = str(value)
251
        value = unicodedata.normalize('NFKC', value)
252
        value = re.sub(r'[^.\w\s-]', '_', value)
253
        return value
254
255
    @staticmethod
256
    def convert_filename(file_struct) -> str:
257
        """Decode the name of some attachments."""
258
259
        filename = 'unknown'
260
        if file_struct is not None:
261
            file_struct = email.header.decode_header(file_struct)[0]
262
            encoding = file_struct[1]
263
            if encoding is not None:
264
                filename = file_struct[0].decode(encoding)
265
            else:
266
                filename = file_struct[0]
267
268
        return MailboxCleanerMessage.slugify_filename(filename)
269