Passed
Push — main ( 2e83d2...71d461 )
by Alexander
01:28
created

src.mailbox_message   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 283
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 180
dl 0
loc 283
rs 8.96
c 0
b 0
f 0
wmc 43

12 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerMessage.__init__() 0 2 1
A MailboxCleanerMessage.slugify_filename() 0 8 1
A MailboxCleanerMessage.download_attachment() 0 25 5
A MailboxCleanerMessage.convert_filename() 0 14 3
A MailboxCleanerMessage.get_uid() 0 8 1
B MailboxCleanerMessage._copy_file() 0 25 6
A MailboxCleanerMessage.get_hash() 0 9 4
A MailboxCleanerMessage.download_and_detach_attachments() 0 21 5
A MailboxCleanerMessage.detach_attachment() 0 40 3
A MailboxCleanerMessage.get_subject() 0 17 4
A MailboxCleanerMessage.is_non_detachable_part() 0 11 1
C MailboxCleanerMessage.process_directory() 0 39 9

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_message often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
from email.parser import HeaderParser
15
import hashlib
16
import logging
17
import os.path
18
import re
19
import shutil
20
import tempfile
21
import time
22
import unicodedata
23
import src.emlx2eml
24
25
# pylint: disable=R0801
26
__author__ = "Alexander Willner"
27
__copyright__ = "Copyright 2020, Alexander Willner"
28
__credits__ = ["github.com/guido4000",
29
               "github.com/halteproblem", "github.com/jamesridgway"]
30
__license__ = "MIT"
31
__version__ = "1.0.0"
32
__maintainer__ = "Alexander Willner"
33
__email__ = "[email protected]"
34
__status__ = "Development"
35
36
37
class MailboxCleanerMessage():
38
    """
39
    Class to represent an e-mail.
40
    """
41
42
    _PLACEHOLDER = """
43
===========================================================
44
This message contained an attachment that was stripped out.
45
The file was stored to: "%(newfile)s".
46
The original file name was: "%(filename)s".
47
The original size was: %(size)d KB.
48
The original type was: %(type)s.
49
Tool: https://github.com/AlexanderWillner/MailboxCleanup
50
===========================================================
51
"""
52
53
    def __init__(self, args):
54
        self.args = args
55
56
    def download_and_detach_attachments(self, msg):
57
        """Download attachments and remove them from the mail."""
58
59
        modified = False
60
61
        # Iterate over each part of the email
62
        for part in msg.walk():
63
            if self.is_non_detachable_part(part):
64
                continue
65
            # Only download in relevant mode
66
            date = time.mktime(email.utils.parsedate(msg.get('date')))
67
            target = self.download_attachment(part, date)
68
            if target is not None:
69
                # Only detach in relevant mode
70
                if not self.args.detach:
71
                    logging.debug('      Detaching\t: skipped (disabled)')
72
                    continue
73
                self.detach_attachment(part, target)
74
                modified = True
75
76
        return modified
77
78
    def is_non_detachable_part(self, part):
79
        """Only process certain types and sizes of attachments."""
80
81
        msg_size = len(str(part)) / 1024
82
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
83
                      msg_size, self.args.max_size,
84
                      part.get_content_maintype())
85
86
        return part.get_content_maintype() == 'multipart' or \
87
            part.get('Content-Disposition') is None or \
88
            msg_size <= self.args.max_size
89
90
    def download_attachment(self, part, date) -> str:
91
        """Download the attachment from a part of an email."""
92
93
        if self.args.skip_download:
94
            logging.info('      Downl.\t: skipped (disabled)')
95
            return ""
96
97
        file_attached = self.convert_filename(part.get_filename())
98
99
        if file_attached == "unknown":
100
            logging.warning('Warning\t: Unknown attachment '
101
                            '(skipping this attachment)')
102
            return None
103
104
        if not os.path.exists(self.args.target):
105
            os.mkdir(self.args.target)
106
        with tempfile.NamedTemporaryFile() as file_temp:
107
            logging.info('      Downl.\t: "%s" (%s)',
108
                         file_attached, part.get_content_maintype())
109
            logging.debug('      Downl.\t: To "%s"', file_temp.name)
110
            payload = part.get_payload(decode=True)
111
            file_temp.write(payload)
112
            target = self._copy_file(file_temp.name, file_attached, date)
113
114
        return target
115
116
    def _copy_file(self, source, target_name, date, iterator=0) -> str:
117
        """Copy file, check for duplicates via hash value."""
118
119
        target_base, target_extension = os.path.splitext(target_name)
120
        if iterator > 0:
121
            target_base = target_base + "-" + str(iterator)
122
        target = os.path.join(self.args.target, target_base + target_extension)
123
        if iterator == 0:
124
            logging.debug('      Moving\t: From "%s" to "%s".', source, target)
125
126
        if not os.path.isfile(target):
127
            shutil.copy2(source, target)
128
            os.utime(target, (date, date))
129
        else:
130
            source_hash = MailboxCleanerMessage.get_hash(source)
131
            target_hash = MailboxCleanerMessage.get_hash(target)
132
            if source_hash != target_hash:
133
                if iterator == 0:
134
                    logging.debug(
135
                        '      Conflict\t: Resolving same file / other hash..')
136
                self._copy_file(source, target_name, date, iterator + 1)
137
            else:
138
                logging.debug('      Moving\t: Already exists (same hash)')
139
140
        return target
141
142
    def process_directory(self, handler, folder=None):
143
        """Upload messages from a local directory."""
144
145
        source = self.args.upload if folder is None else folder
146
        if os.path.isfile(source):
147
            filenames = [os.path.dirname(source)]
148
        else:
149
            filenames = os.listdir(source)
150
151
        for i, filename in enumerate(filenames, start=1):
152
            filename = os.path.join(source, filename)
153
154
            # Recursive walker
155
            if os.path.isdir(filename):
156
                self.process_directory(handler, filename)
157
158
            # Only take eml files into account
159
            if not filename.lower().endswith(".eml") and\
160
               not filename.lower().endswith(".emlx"):
161
                continue
162
163
            logging.warning('Files\t\t: %d / %d', i, len(filenames))
164
165
            with open(filename) as filepointer:
166
                # Specific handling of emlx files
167
                if filename.lower().endswith(".emlx"):
168
                    msg = src.emlx2eml.parse_emlx(filename)
169
                else:
170
                    msg = email.message_from_file(filepointer)
171
172
                # Logging
173
                msg_subject = self.get_subject(msg)
174
                logging.warning('    File\t: %s (%s)', filename, msg_subject)
175
176
                # Remove attachments
177
                self.download_and_detach_attachments(msg)
178
179
                # Post process message (e.g. upload or save it)
180
                handler(msg)
181
182
    @staticmethod
183
    def detach_attachment(msg, target):
184
        """Replace large attachment with dummy text."""
185
186
        # Get message details
187
        msg_content = msg.get_content_type()
188
        msg_filename = MailboxCleanerMessage.convert_filename(
189
            msg.get_filename())
190
        msg_size = len(str(msg)) / 1024
191
        msg_type = msg.get_content_disposition()
192
193
        logging.debug('      Detaching\t: %s', msg_filename)
194
195
        # Remove some old headers
196
        del msg['Content-Transfer-Encoding']
197
        del msg['Content-Disposition']
198
        del msg['Content-Description']
199
        for k, _v in msg.get_params()[1:]:
200
            msg.del_param(k)
201
202
        # Make sure different clients visualize the removed content properly
203
        msg.set_type('text/plain')
204
        msg.set_charset('utf-8')
205
        if msg_type == 'attachment':
206
            msg.add_header('Content-Disposition', 'inline')
207
        else:
208
            msg.add_header('Content-Disposition', 'attachment',
209
                           filename='removed-%s.txt' % msg_filename)
210
            msg.add_header('Content-Description',
211
                           'removed-%s.txt' % msg_filename)
212
213
        # Replace content
214
        msg_details = dict(newfile=target,
215
                           type=msg_content,
216
                           filename=msg_filename,
217
                           size=msg_size)
218
        msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details
219
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
220
                                                   'text', 'utf-8')
221
        msg.set_payload(msg_placeholder.get_payload())
222
223
    @staticmethod
224
    def get_uid(message) -> str:
225
        """Get UID of message."""
226
227
        parser = HeaderParser()
228
        header = parser.parsestr(message.as_string())
229
        uid = email.utils.parseaddr(header['message-id'])
230
        return uid[1]
231
232
    @staticmethod
233
    def get_subject(message) -> str:
234
        """Get shortened message subject for visualization."""
235
236
        if 'subject' in message:
237
            subject = message['subject']
238
        else:
239
            subject = "unknown"  # very rarely messages have no subject
240
        subject, encoding = email.header.decode_header(subject)[0]
241
        encoding = 'utf-8' if encoding is None else encoding
242
        subject = subject.decode(encoding, errors='replace')\
243
            if hasattr(subject, 'decode') else subject
244
        subject = subject[:75] + (subject[75:] and '...')
245
        subject = subject.replace('\r\n', '')
246
        subject = subject.replace('\t', ' ')
247
248
        return subject
249
250
    @staticmethod
251
    def get_hash(filename: str) -> str:
252
        """Get hash from filename to detect duplicates."""
253
254
        hash_value = hashlib.sha256()
255
        with open(filename, "rb") as file:
256
            for byte_block in iter(lambda: file.read(4096), b""):
257
                hash_value.update(byte_block)
258
        return hash_value.hexdigest()
259
260
    @staticmethod
261
    def slugify_filename(value):
262
        """Make sure attachments contain only valid characters."""
263
264
        value = str(value)
265
        value = unicodedata.normalize('NFKC', value)
266
        value = re.sub(r'[^.\w\s-]', '_', value)
267
        return value
268
269
    @staticmethod
270
    def convert_filename(file_struct) -> str:
271
        """Decode the name of some attachments."""
272
273
        filename = 'unknown'
274
        if file_struct is not None:
275
            file_struct = email.header.decode_header(file_struct)[0]
276
            encoding = file_struct[1]
277
            if encoding is not None:
278
                filename = file_struct[0].decode(encoding)
279
            else:
280
                filename = file_struct[0]
281
282
        return MailboxCleanerMessage.slugify_filename(filename)
283