Passed
Push — main ( 902473...2e83d2 )
by Alexander
01:28
created

src.mailbox_imap   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 338
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 213
dl 0
loc 338
rs 8.4
c 0
b 0
f 0
wmc 50

16 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerIMAP.cleanup() 0 4 1
A MailboxCleanerIMAP.login() 0 14 4
A MailboxCleanerIMAP.__init__() 0 9 1
B MailboxCleanerIMAP.get_msg() 0 30 6
A MailboxCleanerIMAP.get_msgs_from_folder() 0 14 1
A MailboxCleanerIMAP._load_cache() 0 10 4
A MailboxCleanerIMAP.get_flags_from_struct() 0 9 2
A MailboxCleanerIMAP.get_folders() 0 20 2
A MailboxCleanerIMAP.upload() 0 29 4
B MailboxCleanerIMAP.process_folders() 0 43 6
A MailboxCleanerIMAP.logout() 0 14 3
A MailboxCleanerIMAP.does_msg_exist() 0 17 4
A MailboxCleanerIMAP.get_msg_from_struct() 0 13 3
A MailboxCleanerIMAP._save_cache() 0 5 2
A MailboxCleanerIMAP.convert_date() 0 8 1
B MailboxCleanerIMAP.replace_msg() 0 30 6

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_imap often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
import email.parser
15
import imaplib
16
import logging
17
import socket
18
import time
19
import typing
20
import collections
21
import os.path
22
import pickle
23
24
from src.mailbox_message import MailboxCleanerMessage
25
26
imaplib._MAXLINE = 10000000  # pylint: disable=protected-access
27
28
29
__author__ = "Alexander Willner"
30
__copyright__ = "Copyright 2020, Alexander Willner"
31
__credits__ = ["github.com/guido4000",
32
               "github.com/halteproblem", "github.com/jamesridgway"]
33
__license__ = "MIT"
34
__version__ = "1.0.0"
35
__maintainer__ = "Alexander Willner"
36
__email__ = "[email protected]"
37
__status__ = "Development"
38
39
40
class MailboxCleanerIMAP():
41
    """
42
    Download and detach/strip/remove attachments from e-mails
43
    on IMAP servers.
44
    """
45
46
    # Number of retries to get messages
47
    __RETRIES = 2
48
49
    # IMAP folders to ignore
50
    __IGNORE_PREFIX = ('Contacts', 'Calendar', '"Calendar',
51
                       'Trash', '"Deleted', 'Tasks',
52
                       '"[Gmail]"')
53
54
    def __init__(self, args, imap=None):
55
        """Initialize class."""
56
57
        self.args = args
58
        self.readonly = not self.args.detach
59
        self.message = MailboxCleanerMessage(args)
60
        self.cache = collections.OrderedDict()
61
        self.cache_file = args.server + '_cache.pkl'
62
        self.imap: imaplib.IMAP4_SSL = imap
63
64
    def cleanup(self):
65
        """Cleanup after error."""
66
67
        self._save_cache()
68
69
    def login(self):
70
        """Log into the IMAP server."""
71
72
        try:
73
            if self.imap is None:
74
                self.imap = imaplib.IMAP4_SSL(self.args.server)
75
            self.imap.login(self.args.user, self.args.password)
76
            self._load_cache()
77
        except socket.gaierror as error:
78
            raise SystemExit('Login failed (wrong server?): %s' %
79
                             error) from error
80
        except imaplib.IMAP4.error as error:
81
            raise SystemExit('Login failed (wrong password?): %s' %
82
                             error) from error
83
84
    def logout(self):
85
        """Log out of the IMAP server."""
86
87
        try:
88
            self.imap.close()
89
            logging.warning('Connection\t: Closed')
90
        except (AttributeError, imaplib.IMAP4.error):
91
            pass
92
93
        try:
94
            self.imap.logout()
95
            logging.warning('Connection\t: Logged Out')
96
        except (AttributeError, imaplib.IMAP4.error):
97
            pass
98
99
    def does_msg_exist(self, msg) -> bool:
100
        """Check if message is already on the server."""
101
102
        msg_uid = self.message.get_uid(msg)
103
        self.imap.select(self.args.folder, readonly=True)
104
        status, data = self.imap.uid('SEARCH', None,
105
                                     '(HEADER Message-ID "%s") UNDELETED'
106
                                     % msg_uid)
107
108
        if data is not None and\
109
           len(data[0]) > 0 and\
110
           self.args.upload is not None:
111
            logging.warning('    Duplicate\t: %s', status)
112
            self.cache[msg_uid] = self.message.get_subject(msg_uid)
113
            return True
114
115
        return False
116
117
    def process_folders(self):
118
        """Iterate over mails in configured folders."""
119
120
        folders = self.get_folders()
121
122
        # Iterate over each folder
123
        for i, folder in enumerate(folders, start=1):
124
125
            # Get all mails in this folder
126
            logging.info('Progress\t: %s / %s (folders)', i, len(folders))
127
            logging.warning('Folder\t\t: %s (started)', folder)
128
            msg_uids = self.get_msgs_from_folder(folder)
129
130
            # Iterate over each email
131
            for j, msg_uid in enumerate(msg_uids, start=1):
132
133
                # Skip if already in cache
134
                logging.info('Progress\t: %s / %s (mail uid: %s)',
135
                             j, len(msg_uids), msg_uid.decode())
136
                if msg_uid in self.cache:
137
                    logging.info('  Subject\t: %s (cached)',
138
                                 self.cache[msg_uid])
139
                    continue
140
141
                # Get the actual email
142
                try:
143
                    msg, msg_flags = self.get_msg(msg_uid)
144
                except imaplib.IMAP4.error:
145
                    logging.info('  Error\t: Message %s skipped', msg_uid)
146
                    continue
147
                subject = self.message.get_subject(msg)
148
                logging.info('  Subject\t: %s', subject)
149
150
                # Download and detach attachments from email
151
                modified = self.message.download_and_detach_attachments(msg)
152
153
                # Upload new email
154
                if modified:
155
                    self.replace_msg(msg, msg_flags, msg_uid, folder)
156
157
                self.cache[msg_uid] = subject
158
159
            logging.warning('Folder\t\t: %s (completed)', folder)
160
161
    def replace_msg(self, msg, msg_flags, msg_uid, folder):
162
        """Upload new message and remove the old one."""
163
164
        # Only upload in non-readonly mode
165
        if self.readonly:
166
            logging.debug('    Detaching\t: skipped (read-only mode)')
167
            return
168
169
        # Upload new message
170
        status, data = self.upload(msg, msg_flags)
171
172
        # Delete old message
173
        if status == 'OK' and self.readonly is False:
174
            result = self.imap.select(folder, readonly=self.readonly)
175
            assert result[0] == 'OK'
176
            result = self.imap.uid('STORE', msg_uid, '+FLAGS', '\\Deleted')
177
            logging.debug('    Deleting\t: %s', result)
178
            # GMail needs special treatment
179
            try:
180
                self.imap.uid('STORE', msg_uid, '+X-GM-LABELS', '\\Trash')
181
            except imaplib.IMAP4.error:
182
                pass
183
            # Sometimes expunge just fails with an EOF socket error
184
            try:
185
                self.imap.expunge()
186
                logging.debug('    Comment\t: Expunged')
187
            except imaplib.IMAP4.abort:
188
                pass
189
        else:
190
            logging.warning('    Error\t: "%s"', data)
191
192
    def upload(self, msg, msg_flags='\\Seen'):
193
        """Upload message to server."""
194
195
        # Knowing what's going on
196
        msg_date = self.convert_date(msg.get('date'))
197
        msg_subject = self.message.get_subject(msg)
198
        msg_uid = self.message.get_uid(msg)
199
        logging.debug('    Uploading\t: %s / %s', msg_date, msg_flags)
200
201
        # Check cache
202
        msg_uid = self.message.get_uid(msg)
203
        if msg_uid in self.cache:
204
            logging.warning('    Cache\t: OK')
205
            return ('Cached', '')
206
207
        # Check for duplicates
208
        if self.does_msg_exist(msg) is True:
209
            self.cache[msg_uid] = msg_subject
210
            return ('Duplicate', '')
211
212
        status, data = self.imap.append(
213
            self.args.folder, msg_flags, msg_date, msg.as_string().encode())
214
        if status == "OK":
215
            logging.warning('    Success\t: %s', status)
216
            self.cache[msg_uid] = msg_subject
217
        else:
218
            logging.warning('    Error\t\t: %s', data)
219
220
        return status, data
221
222
    def get_msg(self, uid):
223
        """Fetch an email from the IMAP server."""
224
225
        # Sometimes IMAP servers might return empty bodies, so try again
226
        for _ in range(self.__RETRIES):
227
            try:
228
                result, data = self.imap.uid('fetch', uid,
229
                                             '(UID BODY.PEEK[] FLAGS)')
230
                if data is None or data[0] is None:
231
                    logging.warning('  Error\t: '
232
                                    'Could not get a message body. '
233
                                    'Retrying in a few seconds...')
234
                    time.sleep(2)
235
                    raise imaplib.IMAP4.error('Could not get a message body')
236
237
                body = data[0][1]
238
                logging.debug('  Result (Size)\t: %s (%d KB)',
239
                              result, len(body) / 1024)
240
241
                msg = self.get_msg_from_struct(data)
242
                msg_flags = self.get_flags_from_struct(data)
243
244
                logging.debug('  Flags\t\t: %s', msg_flags)
245
246
                return (msg, msg_flags)
247
            except imaplib.IMAP4.error:
248
                continue
249
            break
250
        else:
251
            raise imaplib.IMAP4.error('Could not get a message subject')
252
253
    def get_msgs_from_folder(self, folder):
254
        """Get all emails from a folder on the IMAP server."""
255
256
        # Safety net: enable read-only if requested
257
        logging.warning('Read Only\t: %s', self.readonly)
258
        self.imap.select(folder, readonly=self.readonly)
259
260
        # Extract email UIDs
261
        result_mails, data_mails = self.imap.uid('search', None, "ALL")
262
        msg_uids = data_mails[0].split()
263
        logging.warning('Mails (#)\t: %s (%s)',
264
                        result_mails, len(msg_uids))
265
266
        return msg_uids
267
268
    def get_folders(self) -> typing.List[str]:
269
        """Get the folders from the IMAP server to iterate through."""
270
271
        res, folder_list = self.imap.list()
272
273
        logging.warning('Folders (#)\t: %s (%s)', res, len(folder_list))
274
        logging.warning('All Folders\t: %s', self.args.all)
275
276
        if not self.args.all:
277
            folders = [self.args.folder]
278
        else:
279
            folders = [item.decode().split('"/"')[-1].strip()
280
                       for item in folder_list]
281
282
            folders[:] = [item for item in folders
283
                          if not item.startswith(self.__IGNORE_PREFIX)]
284
            folders[:] = [item for item in folders
285
                          if not item.startswith(self.__IGNORE_PREFIX)]
286
287
        return folders
288
289
    @staticmethod
290
    def convert_date(date):
291
        """Convert dates to copy old date to new message."""
292
293
        pz_time = email.utils.parsedate_tz(date)
294
        stamp = email.utils.mktime_tz(pz_time)
295
        date = imaplib.Time2Internaldate(stamp)
296
        return date
297
298
    @staticmethod
299
    def get_msg_from_struct(data) -> str:
300
        """Convert message to a string."""
301
302
        try:
303
            raw_email = (data[0][1]).decode('utf-8')
304
        except ValueError:
305
            try:
306
                raw_email = (data[0][1]).decode('iso-8859-1')
307
            except ValueError:
308
                raw_email = (data[0][1]).decode('utf-8', 'backslashreplace')
309
310
        return email.message_from_string(raw_email)
311
312
    @staticmethod
313
    def get_flags_from_struct(data):
314
        """Get flags to copy old flags to new message."""
315
316
        flags = imaplib.ParseFlags(data[1])
317
        flags = b" ".join(flags) if flags != () else b""
318
        flags = flags.decode("utf-8")
319
        flags = flags.replace("\\Recent", "")  # read-only attribute
320
        return flags.strip()
321
322
    def _load_cache(self):
323
        """Load cache of processed mail UIDs with their subjects."""
324
325
        # Create new cache if needed
326
        if not os.path.exists(self.cache_file) or\
327
           self.args.reset_cache:
328
            self._save_cache()
329
330
        with open(self.cache_file, 'rb') as filepointer:
331
            self.cache = pickle.load(filepointer)
332
333
    def _save_cache(self):
334
        """Save cache of processed mail UIDs with their subjects."""
335
336
        with open(self.cache_file, 'wb+') as filepointer:
337
            pickle.dump(self.cache, filepointer, pickle.HIGHEST_PROTOCOL)
338