Completed
Push — main ( eac31d...478f4a )
by Alexander
01:30
created

src.mailbox_cleaner   F

Complexity

Total Complexity 82

Size/Duplication

Total Lines 561
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 370
dl 0
loc 561
rs 2
c 0
b 0
f 0
wmc 82

24 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleaner.__init__() 0 6 1
A MailboxCleaner.upload_msg_to_server() 0 9 1
B MailboxCleaner.download_attachment() 0 34 7
A MailboxCleaner.slugify_filename() 0 8 1
A MailboxCleaner.get_subject() 0 17 4
C MailboxCleaner.process_folder_locally() 0 45 9
A MailboxCleaner.get_hash() 0 9 4
A MailboxCleaner.is_non_detachable_part() 0 11 1
A MailboxCleaner.get_flags_from_struct() 0 9 2
A MailboxCleaner.get_msgs_from_server_folder() 0 14 1
B MailboxCleaner.get_msg_from_server() 0 30 6
A MailboxCleaner.get_mail_from_struct() 0 13 3
B MailboxCleaner.__copy_file() 0 22 6
A MailboxCleaner.login() 0 12 3
A MailboxCleaner.get_folders_from_server() 0 20 2
A MailboxCleaner.convert_date() 0 8 1
A MailboxCleaner.download_and_detach_attachments() 0 15 4
A MailboxCleaner.run() 0 16 3
A MailboxCleaner.replace_msg_on_server() 0 20 4
A MailboxCleaner.__load_cache() 0 10 4
A MailboxCleaner.convert_filename() 0 14 3
B MailboxCleaner.process_folders_on_server() 0 37 5
A MailboxCleaner.detach_attachment() 0 35 3
A MailboxCleaner.__save_cache() 0 5 2

2 Functions

Rating   Name   Duplication   Size   Complexity  
A main() 0 6 1
B handle_arguments() 0 43 1

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_cleaner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
import email
11
import email.utils
12
import email.mime.text
13
import tempfile
14
import shutil
15
import time
16
import re
17
import unicodedata
18
import hashlib
19
import typing
20
import os.path
21
import socket
22
import pickle
23
import argparse
24
import logging
25
import collections
26
import imaplib
27
imaplib._MAXLINE = 10000000  # pylint: disable=protected-access
28
29
__author__ = "Alexander Willner"
30
__copyright__ = "Copyright 2020, Alexander Willner"
31
__credits__ = ["github.com/guido4000",
32
               "github.com/halteproblem", "github.com/jamesridgway"]
33
__license__ = "MIT"
34
__version__ = "1.0.0"
35
__maintainer__ = "Alexander Willner"
36
__email__ = "[email protected]"
37
__status__ = "Development"
38
39
40
class MailboxCleaner():
41
    """
42
    Download and detach/strip/remove attachments from e-mails
43
    on IMAP servers.
44
    """
45
46
    __RETRIES = 2
47
48
    __PREFIXES = ('Contacts', 'Calendar', '"Calendar',
49
                  'Trash', '"Deleted', 'Tasks',
50
                  '"[Gmail]"')
51
52
    __PLACEHOLDER = """
53
===========================================================
54
This message contained an attachment that was stripped out.
55
The filename was: "%(filename)s".
56
The size was: %(size)d KB.
57
The type was: %(type)s.
58
Tool: https://github.com/AlexanderWillner/MailboxCleanup
59
===========================================================
60
"""
61
62
    def __init__(self, args):
63
        self.cache = collections.OrderedDict()
64
        self.args = args
65
        self.cache_file = args.server + '_cache.pkl'
66
        self.readonly = not self.args.detach
67
        self.imap: imaplib.IMAP4_SSL = None
68
69
    def run(self):
70
        """Login and process mails."""
71
        try:
72
            self.login()
73
            self.__load_cache()
74
            folders = self.get_folders_from_server()
75
            if self.args.upload:
76
                self.process_folder_locally()
77
            else:
78
                self.process_folders_on_server(folders)
79
                self.imap.close()
80
            self.imap.logout()
81
        except KeyboardInterrupt as error:
82
            raise SystemExit('\nCancelling...') from error
83
        finally:
84
            self.__save_cache()
85
86
    def __load_cache(self):
87
        """Load cache of processed mail UIDs with their subjects."""
88
89
        # Create new cache if needed
90
        if not os.path.exists(self.cache_file) or\
91
           self.args.reset_cache:
92
            self.__save_cache()
93
94
        with open(self.cache_file, 'rb') as filepointer:
95
            self.cache = pickle.load(filepointer)
96
97
    def __save_cache(self):
98
        """Save cache of processed mail UIDs with their subjects."""
99
100
        with open(self.cache_file, 'wb+') as filepointer:
101
            pickle.dump(self.cache, filepointer, pickle.HIGHEST_PROTOCOL)
102
103
    def login(self):
104
        """Log into the IMAP server."""
105
106
        try:
107
            self.imap = imaplib.IMAP4_SSL(self.args.server)
108
            self.imap.login(self.args.user, self.args.password)
109
        except socket.gaierror as error:
110
            raise SystemExit('Login failed (wrong server?): %s' %
111
                             error) from error
112
        except imaplib.IMAP4.error as error:
113
            raise SystemExit('Login failed (wrong password?): %s' %
114
                             error) from error
115
116
    def process_folder_locally(self):
117
        """Upload messages from a local directory."""
118
119
        directory = self.args.upload
120
        msg_flags = '\\Seen'
121
        msg_folder = self.args.folder
122
123
        for filename in os.listdir(directory):
124
            if not filename.lower().endswith(".eml") and\
125
               not filename.lower().endswith(".emlx"):
126
                continue
127
128
            filename = os.path.join(directory, filename)
129
            with open(filename) as filepointer:
130
                msg = email.message_from_file(filepointer)
131
132
            msg_subject = self.get_subject(msg)
133
            msg_uid = msg['message-id'] if 'message-id' in msg else None
134
            logging.warning('File\t\t: %s (%s)', filename, msg_subject)
135
136
            # Check cache
137
            if msg_uid in self.cache:
138
                logging.warning('    Cache\t: OK')
139
                continue
140
141
            # Check for duplicates
142
            self.imap.select(msg_folder, readonly=True)
143
            status, data = self.imap.uid('SEARCH',
144
                                         '(HEADER Message-ID "%s")' % msg_uid)
145
            if len(data[0]) > 0:
146
                logging.warning('    Duplicate\t: %s', status)
147
                self.cache[msg_uid] = msg_subject
148
                continue
149
150
            # Remove attachments
151
            self.download_and_detach_attachments(msg)
152
153
            # Upload message
154
            status, data = self.upload_msg_to_server(msg, msg_flags,
155
                                                     msg_folder)
156
            if status == "OK":
157
                logging.warning('    Success\t: %s', status)
158
                self.cache[msg_uid] = msg_subject
159
            else:
160
                logging.warning('    Error\t\t: %s', data)
161
162
    def process_folders_on_server(self, folders):
163
        """Iterate over mails in given folders."""
164
165
        # Iterate over each folder
166
        for i, folder in enumerate(folders, start=1):
167
168
            # Get all mails in this folder
169
            logging.info('Progress\t: %s / %s (folders)', i, len(folders))
170
            logging.warning('Folder\t\t: %s (started)', folder)
171
            msg_uids = self.get_msgs_from_server_folder(folder)
172
173
            # Iterate over each email
174
            for j, msg_uid in enumerate(msg_uids, start=1):
175
176
                # Skip if already in cache
177
                logging.info('Progress\t: %s / %s (mail uid: %s)',
178
                             j, len(msg_uids), msg_uid.decode())
179
                if msg_uid in self.cache:
180
                    logging.info('  Subject\t: %s (cached)',
181
                                 self.cache[msg_uid])
182
                    continue
183
184
                # Get the actual email
185
                msg, msg_flags = self.get_msg_from_server(msg_uid)
186
                subject = self.get_subject(msg)
187
                logging.info('  Subject\t: %s', subject)
188
189
                # Download and detach attachments from email
190
                modified = self.download_and_detach_attachments(msg)
191
192
                # Upload new email
193
                if modified:
194
                    self.replace_msg_on_server(msg, msg_flags, folder, msg_uid)
195
196
                self.cache[msg_uid] = subject
197
198
            logging.warning('Folder\t\t: %s (completed)', folder)
199
200
    def download_and_detach_attachments(self, msg):
201
        """Download attachments and remove them from the mail."""
202
203
        modified = False
204
205
        # Iterate over each part of the email
206
        for part in msg.walk():
207
            if self.is_non_detachable_part(part):
208
                continue
209
            success = self.download_attachment(part)
210
            if success:
211
                self.detach_attachment(part)
212
                modified = True
213
214
        return modified
215
216
    def replace_msg_on_server(self, msg, msg_flags, folder, msg_uid):
217
        """Replace old/large message on the server."""
218
219
        # Only upload in non-readonly mode
220
        if self.readonly:
221
            logging.debug('    Detaching\t: skipped (read-only mode)')
222
            return
223
224
        # Upload new message and delete the old one
225
        status, data = self.upload_msg_to_server(msg, msg_flags, folder)
226
        if status == 'OK':
227
            self.imap.uid('STORE', msg_uid, '+FLAGS', '\\Deleted')
228
            # GMail needs special treatment
229
            try:
230
                self.imap.uid('STORE', msg_uid, '+X-GM-LABELS', '\\Trash')
231
            except imaplib.IMAP4.error:
232
                pass
233
            self.imap.expunge()
234
        else:
235
            logging.warning('    Error\t: "%s"', data)
236
237
    def upload_msg_to_server(self, msg, msg_flags, folder):
238
        """Upload a message to the server."""
239
240
        # Knowing what's going on
241
        msg_date = self.convert_date(msg.get('date'))
242
        logging.debug('    Uploading\t: %s / %s', msg_date, msg_flags)
243
244
        return self.imap.append(
245
            folder, msg_flags, msg_date, msg.as_string().encode())
246
247
    def is_non_detachable_part(self, part):
248
        """Only process certain types and sizes of attachments."""
249
250
        msg_size = len(str(part)) / 1024
251
        logging.debug('    Part\t: %d KB / %d KB (type: %s)',
252
                      msg_size, self.args.max_size,
253
                      part.get_content_maintype())
254
255
        return part.get_content_maintype() == 'multipart' or \
256
            part.get('Content-Disposition') is None or \
257
            msg_size <= self.args.max_size
258
259
    def get_msg_from_server(self, uid):
260
        """Fetch an email from the IMAP server."""
261
262
        # Sometimes IMAP servers might return empty bodies, so try again
263
        for _ in range(self.__RETRIES):
264
            try:
265
                result, data = self.imap.uid('fetch', uid,
266
                                             '(UID BODY.PEEK[] FLAGS)')
267
                if data is None or data[0] is None:
268
                    logging.warning('  Error\t: '
269
                                    'Could not get a message body. '
270
                                    'Retrying in a few seconds...')
271
                    time.sleep(2)
272
                    raise imaplib.IMAP4.error('Could not get a message body')
273
274
                body = data[0][1]
275
                logging.debug('  Result (Size)\t: %s (%d KB)',
276
                              result, len(body) / 1024)
277
278
                msg = self.get_mail_from_struct(data)
279
                msg_flags = self.get_flags_from_struct(data)
280
281
                logging.debug('  Flags\t\t: %s', msg_flags)
282
283
                return (msg, msg_flags)
284
            except imaplib.IMAP4.error:
285
                continue
286
            break
287
        else:
288
            raise imaplib.IMAP4.error('Could not get a message subject')
289
290
    def get_msgs_from_server_folder(self, folder):
291
        """Get all emails from a folder on the IMAP server."""
292
293
        # Safety net: enable read-only if requested
294
        logging.warning('Read Only\t: %s', self.readonly)
295
        self.imap.select(folder, readonly=self.readonly)
296
297
        # Extract email UIDs
298
        result_mails, data_mails = self.imap.uid('search', None, "ALL")
299
        msg_uids = data_mails[0].split()
300
        logging.warning('Mails (#)\t: %s (%s)',
301
                        result_mails, len(msg_uids))
302
303
        return msg_uids
304
305
    def get_folders_from_server(self) -> typing.List[str]:
306
        """Get the folders from the IMAP server to iterate through."""
307
308
        res, folder_list = self.imap.list()
309
310
        logging.warning('Folders (#)\t: %s (%s)', res, len(folder_list))
311
        logging.warning('All Folders\t: %s', self.args.all)
312
313
        if not self.args.all:
314
            folders = [self.args.folder]
315
        else:
316
            folders = [item.decode().split('"/"')[-1].strip()
317
                       for item in folder_list]
318
319
            folders[:] = [item for item in folders
320
                          if not item.startswith(self.__PREFIXES)]
321
            folders[:] = [item for item in folders
322
                          if not item.startswith(self.__PREFIXES)]
323
324
        return folders
325
326
    def download_attachment(self, part) -> bool:
327
        """Download the attachment from a part of an email."""
328
329
        if self.args.skip_download:
330
            logging.info('    Downloading\t: skipped (disabled)')
331
            return True
332
333
        if part.get_filename() is None:
334
            logging.warning('Warning\t: Could not download attachment '
335
                            '(skipping this attachment)')
336
            return False
337
338
        file_attached = self.convert_filename(part.get_filename())
339
340
        if file_attached == "unknown":
341
            logging.warning('Warning\t: Unknown attachment '
342
                            '(skipping this attachment)')
343
            return False
344
345
        if not os.path.exists(self.args.target):
346
            os.mkdir(self.args.target)
347
        with tempfile.NamedTemporaryFile() as file_temp:
348
            logging.info('    Downloading\t: "%s" (%s)',
349
                         file_attached, part.get_content_maintype())
350
            logging.debug('    Downloading\t: To "%s"', file_temp.name)
351
            payload = part.get_payload(decode=True)
352
            if payload is not None:
353
                file_temp.write(payload)
354
                self.__copy_file(file_temp.name, file_attached)
355
            else:
356
                logging.warning('    Downloading\t: File "%s" was empty',
357
                                file_attached)
358
359
        return True
360
361
    def __copy_file(self, source, target_name, iterator=0):
362
        """Copy file, check for duplicates via hash value."""
363
364
        target_base, target_extension = os.path.splitext(target_name)
365
        if iterator > 0:
366
            target_base = target_base + "-" + str(iterator)
367
        target = os.path.join(self.args.target, target_base + target_extension)
368
        if iterator == 0:
369
            logging.debug('    Moving\t: From "%s" to "%s".', source, target)
370
371
        if not os.path.isfile(target):
372
            shutil.copy2(source, target)
373
        else:
374
            source_hash = self.get_hash(source)
375
            target_hash = self.get_hash(target)
376
            if source_hash != target_hash:
377
                if iterator == 0:
378
                    logging.debug(
379
                        '    Conflict\t: Resolving same file / other hash...')
380
                self.__copy_file(source, target_name, iterator + 1)
381
            else:
382
                logging.debug('    Moving\t: Already exists (same hash)')
383
384
    def detach_attachment(self, msg):
385
        """Replace large attachment with dummy text."""
386
387
        # Get message details
388
        msg_content = msg.get_content_type()
389
        msg_filename = self.convert_filename(msg.get_filename())
390
        msg_size = len(str(msg)) / 1024
391
        msg_type = msg.get_content_disposition()
392
393
        # Remove some old headers
394
        del msg['Content-Transfer-Encoding']
395
        del msg['Content-Disposition']
396
        del msg['Content-Description']
397
        for k, _v in msg.get_params()[1:]:
398
            msg.del_param(k)
399
400
        # Make sure different clients visualize the removed content properly
401
        msg.set_type('text/plain')
402
        msg.set_charset('utf-8')
403
        if msg_type == 'attachment':
404
            msg.add_header('Content-Disposition', 'inline')
405
        else:
406
            msg.add_header('Content-Disposition', 'attachment',
407
                           filename='removed-%s.txt' % msg_filename)
408
            msg.add_header('Content-Description',
409
                           'removed-%s.txt' % msg_filename)
410
411
        # Replace content
412
        msg_details = dict(type=msg_content,
413
                           filename=msg_filename,
414
                           size=msg_size)
415
        msg_placeholder = self.__PLACEHOLDER % msg_details
416
        msg_placeholder = email.mime.text.MIMEText(msg_placeholder,
417
                                                   'text', 'utf-8')
418
        msg.set_payload(msg_placeholder.get_payload())
419
420
    @staticmethod
421
    def get_mail_from_struct(data) -> str:
422
        """Convert message to a string."""
423
424
        try:
425
            raw_email = (data[0][1]).decode('utf-8')
426
        except ValueError:
427
            try:
428
                raw_email = (data[0][1]).decode('iso-8859-1')
429
            except ValueError:
430
                raw_email = (data[0][1]).decode('utf-8', 'backslashreplace')
431
432
        return email.message_from_string(raw_email)
433
434
    @staticmethod
435
    def get_subject(message) -> str:
436
        """Get shortened message subject for visualization."""
437
438
        if 'subject' in message:
439
            subject = message['subject']
440
        else:
441
            subject = "unknown"  # very rarely messages have no subject
442
        subject, encoding = email.header.decode_header(subject)[0]
443
        encoding = 'utf-8' if encoding is None else encoding
444
        subject = subject.decode(encoding, errors='replace')\
445
            if hasattr(subject, 'decode') else subject
446
        subject = subject[:75] + (subject[75:] and '...')
447
        subject = subject.replace('\r\n', '')
448
        subject = subject.replace('\t', ' ')
449
450
        return subject
451
452
    @staticmethod
453
    def get_hash(filename: str) -> str:
454
        """Get hash from filename to detect duplicates."""
455
456
        hash_value = hashlib.sha256()
457
        with open(filename, "rb") as file:
458
            for byte_block in iter(lambda: file.read(4096), b""):
459
                hash_value.update(byte_block)
460
        return hash_value.hexdigest()
461
462
    @staticmethod
463
    def convert_date(date):
464
        """Convert dates to copy old date to new message."""
465
466
        pz_time = email.utils.parsedate_tz(date)
467
        stamp = email.utils.mktime_tz(pz_time)
468
        date = imaplib.Time2Internaldate(stamp)
469
        return date
470
471
    @staticmethod
472
    def get_flags_from_struct(data):
473
        """Get flags to copy old flags to new message."""
474
475
        flags = imaplib.ParseFlags(data[1])
476
        flags = b" ".join(flags) if flags != () else b""
477
        flags = flags.decode("utf-8")
478
        flags = flags.replace("\\Recent", "")  # read-only attribute
479
        return flags.strip()
480
481
    @staticmethod
482
    def slugify_filename(value):
483
        """Make sure attachments contain only valid characters."""
484
485
        value = str(value)
486
        value = unicodedata.normalize('NFKC', value)
487
        value = re.sub(r'[^.\w\s-]', '_', value)
488
        return value
489
490
    @staticmethod
491
    def convert_filename(file_struct) -> str:
492
        """Decode the name of some attachments."""
493
494
        filename = 'unknown'
495
        if file_struct is not None:
496
            file_struct = email.header.decode_header(file_struct)[0]
497
            encoding = file_struct[1]
498
            if encoding is not None:
499
                filename = file_struct[0].decode(encoding)
500
            else:
501
                filename = file_struct[0]
502
503
        return MailboxCleaner.slugify_filename(filename)
504
505
506
def handle_arguments() -> argparse.ArgumentParser:
507
    """Provide CLI handler for application."""
508
509
    parser = argparse.ArgumentParser()
510
    parser.add_argument("-a", "--all",
511
                        help="iterate over all folders", action='store_true')
512
    parser.add_argument("-d", "--detach",
513
                        help="remove attachments", action='store_true')
514
    parser.add_argument("-k", "--skip-download",
515
                        help="download attachments", action='store_true')
516
    parser.add_argument("-r", "--reset-cache",
517
                        help="reset cache", action='store_true')
518
    parser.add_argument("-m", "--max-size",
519
                        help="max attachment size in KB", default=200)
520
    parser.add_argument("-f", "--folder",
521
                        help="imap folder to process", default="Inbox")
522
    parser.add_argument("-l", "--upload",
523
                        help="local folder with messages to upload")
524
525
    parser.add_argument("-t", "--target",
526
                        help="download attachments to this local folder",
527
                        default="attachments")
528
    parser.add_argument("-s", "--server", help="imap server", required=True)
529
    parser.add_argument("-u", "--user", help="imap user", required=True)
530
    parser.add_argument("-p", "--password", help="imap user", required=True)
531
    parser.add_argument(
532
        "-v",
533
        "--verbose",
534
        action="count",
535
        default=0,
536
        dest="verbosity",
537
        help="be more verbose (-v, -vv)")
538
    parser.add_argument(
539
        "--version",
540
        action="version",
541
        version="%(prog)s (version {version})".format(version=__version__))
542
543
    args = parser.parse_args()
544
545
    logging.basicConfig(level=logging.WARNING - args.verbosity * 10,
546
                        format="%(message)s")
547
548
    return args
549
550
551
def main():
552
    """Setup and run remover."""
553
554
    args = handle_arguments()
555
    remover = MailboxCleaner(args)
556
    remover.run()
557
558
559
if __name__ == '__main__':
560
    main()
561