Passed
Push — main ( 242f40...8b1e40 )
by Alexander
01:29
created

src.mailbox_imap   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 328
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 204
dl 0
loc 328
rs 8.72
c 0
b 0
f 0
wmc 46

16 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerIMAP.cleanup() 0 4 1
B MailboxCleanerIMAP.get_msg() 0 30 6
A MailboxCleanerIMAP.get_msgs_from_folder() 0 14 1
A MailboxCleanerIMAP._load_cache() 0 10 4
A MailboxCleanerIMAP.get_flags_from_struct() 0 9 2
A MailboxCleanerIMAP.get_folders() 0 20 2
A MailboxCleanerIMAP.upload() 0 30 4
B MailboxCleanerIMAP.process_folders() 0 43 6
A MailboxCleanerIMAP.login() 0 13 3
A MailboxCleanerIMAP.logout() 0 12 3
A MailboxCleanerIMAP.__init__() 0 9 1
A MailboxCleanerIMAP.does_msg_exist() 0 13 2
A MailboxCleanerIMAP.get_msg_from_struct() 0 13 3
A MailboxCleanerIMAP._save_cache() 0 5 2
A MailboxCleanerIMAP.convert_date() 0 8 1
A MailboxCleanerIMAP.replace_msg() 0 26 5

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_imap often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
import email.parser
15
import imaplib
16
import logging
17
import socket
18
import time
19
import typing
20
import collections
21
import os.path
22
import pickle
23
24
from src.mailbox_message import MailboxCleanerMessage
25
26
imaplib._MAXLINE = 10000000  # pylint: disable=protected-access
27
28
29
__author__ = "Alexander Willner"
30
__copyright__ = "Copyright 2020, Alexander Willner"
31
__credits__ = ["github.com/guido4000",
32
               "github.com/halteproblem", "github.com/jamesridgway"]
33
__license__ = "MIT"
34
__version__ = "1.0.0"
35
__maintainer__ = "Alexander Willner"
36
__email__ = "[email protected]"
37
__status__ = "Development"
38
39
40
class MailboxCleanerIMAP():
41
    """
42
    Download and detach/strip/remove attachments from e-mails
43
    on IMAP servers.
44
    """
45
46
    # Number of retries to get messages
47
    __RETRIES = 2
48
49
    # IMAP folders to ignore
50
    __IGNORE_PREFIX = ('Contacts', 'Calendar', '"Calendar',
51
                       'Trash', '"Deleted', 'Tasks',
52
                       '"[Gmail]"')
53
54
    def __init__(self, args):
55
        """Initialize class."""
56
57
        self.args = args
58
        self.readonly = not self.args.detach
59
        self.message = MailboxCleanerMessage(args)
60
        self.cache = collections.OrderedDict()
61
        self.cache_file = args.server + '_cache.pkl'
62
        self.imap: imaplib.IMAP4_SSL = None
63
64
    def cleanup(self):
65
        """Cleanup after error."""
66
67
        self._save_cache()
68
69
    def login(self):
70
        """Log into the IMAP server."""
71
72
        try:
73
            self.imap = imaplib.IMAP4_SSL(self.args.server)
74
            self.imap.login(self.args.user, self.args.password)
75
            self._load_cache()
76
        except socket.gaierror as error:
77
            raise SystemExit('Login failed (wrong server?): %s' %
78
                             error) from error
79
        except imaplib.IMAP4.error as error:
80
            raise SystemExit('Login failed (wrong password?): %s' %
81
                             error) from error
82
83
    def logout(self):
84
        """Log out of the IMAP server."""
85
86
        try:
87
            self.imap.close()
88
        except imaplib.IMAP4.error:
89
            pass
90
91
        try:
92
            self.imap.logout()
93
        except imaplib.IMAP4.error:
94
            pass
95
96
    def does_msg_exist(self, msg) -> bool:
97
        """Check if message is already on the server."""
98
99
        msg_uid = self.message.get_uid(msg)
100
        self.imap.select(self.args.folder, readonly=True)
101
        status, data = self.imap.uid('SEARCH',
102
                                     '(HEADER Message-ID "%s")' % msg_uid)
103
        if data is not None:  # and len(data[0]) > 0:
104
            logging.warning('    Duplicate\t: %s', status)
105
            self.cache[msg_uid] = self.message.get_subject(msg_uid)
106
            return True
107
108
        return False
109
110
    def process_folders(self):
111
        """Iterate over mails in configured folders."""
112
113
        folders = self.get_folders()
114
115
        # Iterate over each folder
116
        for i, folder in enumerate(folders, start=1):
117
118
            # Get all mails in this folder
119
            logging.info('Progress\t: %s / %s (folders)', i, len(folders))
120
            logging.warning('Folder\t\t: %s (started)', folder)
121
            msg_uids = self.get_msgs_from_folder(folder)
122
123
            # Iterate over each email
124
            for j, msg_uid in enumerate(msg_uids, start=1):
125
126
                # Skip if already in cache
127
                logging.info('Progress\t: %s / %s (mail uid: %s)',
128
                             j, len(msg_uids), msg_uid.decode())
129
                if msg_uid in self.cache:
130
                    logging.info('  Subject\t: %s (cached)',
131
                                 self.cache[msg_uid])
132
                    continue
133
134
                # Get the actual email
135
                try:
136
                    msg, msg_flags = self.get_msg(msg_uid)
137
                except imaplib.IMAP4.error:
138
                    logging.info('  Error\t: Message %s skipped', msg_uid)
139
                    continue
140
                subject = self.message.get_subject(msg)
141
                logging.info('  Subject\t: %s', subject)
142
143
                # Download and detach attachments from email
144
                modified = self.message.download_and_detach_attachments(msg)
145
146
                # Upload new email
147
                if modified:
148
                    self.replace_msg(msg, msg_flags, msg_uid)
149
150
                self.cache[msg_uid] = subject
151
152
            logging.warning('Folder\t\t: %s (completed)', folder)
153
154
    def replace_msg(self, msg, msg_flags, msg_uid):
155
        """Upload new message and remove the old one."""
156
157
        # Only upload in non-readonly mode
158
        if self.readonly:
159
            logging.debug('    Detaching\t: skipped (read-only mode)')
160
            return
161
162
        # Upload new message
163
        status, data = self.upload(msg, msg_flags)
164
165
        # Delete old message
166
        if status == 'OK':
167
            self.imap.uid('STORE', msg_uid, '+FLAGS', '\\Deleted')
168
            # GMail needs special treatment
169
            try:
170
                self.imap.uid('STORE', msg_uid, '+X-GM-LABELS', '\\Trash')
171
            except imaplib.IMAP4.error:
172
                pass
173
            # Sometimes expunge just fails with an EOF socket error
174
            try:
175
                self.imap.expunge()
176
            except imaplib.IMAP4.abort:
177
                pass
178
        else:
179
            logging.warning('    Error\t: "%s"', data)
180
181
    def upload(self, msg, msg_flags='\\Seen'):
182
        """Upload message to server."""
183
184
        # Knowing what's going on
185
        msg_date = self.convert_date(msg.get('date'))
186
        msg_subject = self.message.get_subject(msg)
187
        msg_uid = self.message.get_uid(msg)
188
        logging.debug('    Uploading\t: %s / %s', msg_date, msg_flags)
189
190
        # Check cache
191
        msg_uid = self.message.get_uid(msg)
192
        print(msg_uid)
193
        if msg_uid in self.cache:
194
            logging.warning('    Cache\t: OK')
195
            return ('Cached', '')
196
197
        # Check for duplicates
198
        if self.does_msg_exist(msg) is True:
199
            self.cache[msg_uid] = msg_subject
200
            return ('Duplicate', '')
201
202
        status, data = self.imap.append(
203
            self.args.folder, msg_flags, msg_date, msg.as_string().encode())
204
        if status == "OK":
205
            logging.warning('    Success\t: %s', status)
206
            self.cache[msg_uid] = msg_subject
207
        else:
208
            logging.warning('    Error\t\t: %s', data)
209
210
        return status, data
211
212
    def get_msg(self, uid):
213
        """Fetch an email from the IMAP server."""
214
215
        # Sometimes IMAP servers might return empty bodies, so try again
216
        for _ in range(self.__RETRIES):
217
            try:
218
                result, data = self.imap.uid('fetch', uid,
219
                                             '(UID BODY.PEEK[] FLAGS)')
220
                if data is None or data[0] is None:
221
                    logging.warning('  Error\t: '
222
                                    'Could not get a message body. '
223
                                    'Retrying in a few seconds...')
224
                    time.sleep(2)
225
                    raise imaplib.IMAP4.error('Could not get a message body')
226
227
                body = data[0][1]
228
                logging.debug('  Result (Size)\t: %s (%d KB)',
229
                              result, len(body) / 1024)
230
231
                msg = self.get_msg_from_struct(data)
232
                msg_flags = self.get_flags_from_struct(data)
233
234
                logging.debug('  Flags\t\t: %s', msg_flags)
235
236
                return (msg, msg_flags)
237
            except imaplib.IMAP4.error:
238
                continue
239
            break
240
        else:
241
            raise imaplib.IMAP4.error('Could not get a message subject')
242
243
    def get_msgs_from_folder(self, folder):
244
        """Get all emails from a folder on the IMAP server."""
245
246
        # Safety net: enable read-only if requested
247
        logging.warning('Read Only\t: %s', self.readonly)
248
        self.imap.select(folder, readonly=self.readonly)
249
250
        # Extract email UIDs
251
        result_mails, data_mails = self.imap.uid('search', None, "ALL")
252
        msg_uids = data_mails[0].split()
253
        logging.warning('Mails (#)\t: %s (%s)',
254
                        result_mails, len(msg_uids))
255
256
        return msg_uids
257
258
    def get_folders(self) -> typing.List[str]:
259
        """Get the folders from the IMAP server to iterate through."""
260
261
        res, folder_list = self.imap.list()
262
263
        logging.warning('Folders (#)\t: %s (%s)', res, len(folder_list))
264
        logging.warning('All Folders\t: %s', self.args.all)
265
266
        if not self.args.all:
267
            folders = [self.args.folder]
268
        else:
269
            folders = [item.decode().split('"/"')[-1].strip()
270
                       for item in folder_list]
271
272
            folders[:] = [item for item in folders
273
                          if not item.startswith(self.__IGNORE_PREFIX)]
274
            folders[:] = [item for item in folders
275
                          if not item.startswith(self.__IGNORE_PREFIX)]
276
277
        return folders
278
279
    @staticmethod
280
    def convert_date(date):
281
        """Convert dates to copy old date to new message."""
282
283
        pz_time = email.utils.parsedate_tz(date)
284
        stamp = email.utils.mktime_tz(pz_time)
285
        date = imaplib.Time2Internaldate(stamp)
286
        return date
287
288
    @staticmethod
289
    def get_msg_from_struct(data) -> str:
290
        """Convert message to a string."""
291
292
        try:
293
            raw_email = (data[0][1]).decode('utf-8')
294
        except ValueError:
295
            try:
296
                raw_email = (data[0][1]).decode('iso-8859-1')
297
            except ValueError:
298
                raw_email = (data[0][1]).decode('utf-8', 'backslashreplace')
299
300
        return email.message_from_string(raw_email)
301
302
    @staticmethod
303
    def get_flags_from_struct(data):
304
        """Get flags to copy old flags to new message."""
305
306
        flags = imaplib.ParseFlags(data[1])
307
        flags = b" ".join(flags) if flags != () else b""
308
        flags = flags.decode("utf-8")
309
        flags = flags.replace("\\Recent", "")  # read-only attribute
310
        return flags.strip()
311
312
    def _load_cache(self):
313
        """Load cache of processed mail UIDs with their subjects."""
314
315
        # Create new cache if needed
316
        if not os.path.exists(self.cache_file) or\
317
           self.args.reset_cache:
318
            self._save_cache()
319
320
        with open(self.cache_file, 'rb') as filepointer:
321
            self.cache = pickle.load(filepointer)
322
323
    def _save_cache(self):
324
        """Save cache of processed mail UIDs with their subjects."""
325
326
        with open(self.cache_file, 'wb+') as filepointer:
327
            pickle.dump(self.cache, filepointer, pickle.HIGHEST_PROTOCOL)
328