Passed
Push — main ( 2e83d2...71d461 )
by Alexander
01:28
created

src.mailbox_imap   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 339
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 213
dl 0
loc 339
rs 7.92
c 0
b 0
f 0
wmc 51

16 Methods

Rating   Name   Duplication   Size   Complexity  
A MailboxCleanerIMAP.cleanup() 0 4 1
B MailboxCleanerIMAP.get_msg() 0 30 6
A MailboxCleanerIMAP.get_msgs_from_folder() 0 13 1
A MailboxCleanerIMAP._load_cache() 0 10 4
A MailboxCleanerIMAP.get_flags_from_struct() 0 9 2
A MailboxCleanerIMAP.get_folders() 0 19 2
B MailboxCleanerIMAP.upload() 0 33 5
B MailboxCleanerIMAP.process_folders() 0 43 6
A MailboxCleanerIMAP.login() 0 14 4
A MailboxCleanerIMAP.logout() 0 14 3
A MailboxCleanerIMAP.__init__() 0 8 1
A MailboxCleanerIMAP.does_msg_exist() 0 17 4
A MailboxCleanerIMAP.get_msg_from_struct() 0 13 3
A MailboxCleanerIMAP._save_cache() 0 5 2
A MailboxCleanerIMAP.convert_date() 0 8 1
B MailboxCleanerIMAP.replace_msg() 0 30 6

How to fix   Complexity   

Complexity

Complex classes like src.mailbox_imap often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
"""
5
Module to download and to detach/strip/remove attachments
6
from e-mails on IMAP servers.
7
"""
8
9
from __future__ import print_function
10
11
import email
12
import email.mime.text
13
import email.utils
14
import email.parser
15
import imaplib
16
import logging
17
import socket
18
import time
19
import typing
20
import collections
21
import os.path
22
import pickle
23
24
from src.mailbox_message import MailboxCleanerMessage
25
26
imaplib._MAXLINE = 10000000  # pylint: disable=protected-access
27
28
29
__author__ = "Alexander Willner"
30
__copyright__ = "Copyright 2020, Alexander Willner"
31
__credits__ = ["github.com/guido4000",
32
               "github.com/halteproblem", "github.com/jamesridgway"]
33
__license__ = "MIT"
34
__version__ = "1.0.0"
35
__maintainer__ = "Alexander Willner"
36
__email__ = "[email protected]"
37
__status__ = "Development"
38
39
40
class MailboxCleanerIMAP():
41
    """
42
    Download and detach/strip/remove attachments from e-mails
43
    on IMAP servers.
44
    """
45
46
    # Number of retries to get messages
47
    __RETRIES = 2
48
49
    # IMAP folders to ignore
50
    __IGNORE_PREFIX = ('Contacts', 'Calendar', '"Calendar',
51
                       'Trash', '"Deleted', 'Tasks',
52
                       '"[Gmail]"')
53
54
    def __init__(self, args, imap=None):
55
        """Initialize class."""
56
57
        self.args = args
58
        self.message = MailboxCleanerMessage(args)
59
        self.cache = collections.OrderedDict()
60
        self.cache_file = args.server + '_cache.pkl'
61
        self.imap: imaplib.IMAP4_SSL = imap
62
63
    def cleanup(self):
64
        """Cleanup after error."""
65
66
        self._save_cache()
67
68
    def login(self):
69
        """Log into the IMAP server."""
70
71
        try:
72
            if self.imap is None:
73
                self.imap = imaplib.IMAP4_SSL(self.args.server)
74
            self.imap.login(self.args.user, self.args.password)
75
            self._load_cache()
76
        except socket.gaierror as error:
77
            raise SystemExit('Login failed (wrong server?): %s' %
78
                             error) from error
79
        except imaplib.IMAP4.error as error:
80
            raise SystemExit('Login failed (wrong password?): %s' %
81
                             error) from error
82
83
    def logout(self):
84
        """Log out of the IMAP server."""
85
86
        try:
87
            self.imap.close()
88
            logging.warning('Connection\t: Closed')
89
        except (AttributeError, imaplib.IMAP4.error):
90
            pass
91
92
        try:
93
            self.imap.logout()
94
            logging.warning('Connection\t: Logged Out')
95
        except (AttributeError, imaplib.IMAP4.error):
96
            pass
97
98
    def does_msg_exist(self, msg) -> bool:
99
        """Check if message is already on the server."""
100
101
        msg_uid = self.message.get_uid(msg)
102
        self.imap.select(self.args.folder, readonly=self.args.read_only)
103
        status, data = self.imap.uid('SEARCH', None,
104
                                     '(HEADER Message-ID "%s") UNDELETED'
105
                                     % msg_uid)
106
107
        if data is not None and\
108
           len(data[0]) > 0 and\
109
           self.args.upload is not None:
110
            logging.warning('    Duplicate\t: %s', status)
111
            self.cache[msg_uid] = self.message.get_subject(msg_uid)
112
            return True
113
114
        return False
115
116
    def process_folders(self):
117
        """Iterate over mails in configured folders."""
118
119
        folders = self.get_folders()
120
121
        # Iterate over each folder
122
        for i, folder in enumerate(folders, start=1):
123
124
            # Get all mails in this folder
125
            logging.info('Progress\t: %s / %s (folders)', i, len(folders))
126
            logging.warning('Folder\t\t: %s (started)', folder)
127
            msg_uids = self.get_msgs_from_folder(folder)
128
129
            # Iterate over each email
130
            for j, msg_uid in enumerate(msg_uids, start=1):
131
132
                # Skip if already in cache
133
                logging.info('Progress\t: %s / %s (mail uid: %s)',
134
                             j, len(msg_uids), msg_uid.decode())
135
                if msg_uid in self.cache:
136
                    logging.info('  Subject\t: %s (cached)',
137
                                 self.cache[msg_uid])
138
                    continue
139
140
                # Get the actual email
141
                try:
142
                    msg, msg_flags = self.get_msg(msg_uid)
143
                except imaplib.IMAP4.error:
144
                    logging.info('  Error\t: Message %s skipped', msg_uid)
145
                    continue
146
                subject = self.message.get_subject(msg)
147
                logging.info('  Subject\t: %s', subject)
148
149
                # Download and detach attachments from email
150
                modified = self.message.download_and_detach_attachments(msg)
151
152
                # Upload new email
153
                if modified:
154
                    self.replace_msg(msg, msg_flags, msg_uid, folder)
155
156
                self.cache[msg_uid] = subject
157
158
            logging.warning('Folder\t\t: %s (completed)', folder)
159
160
    def replace_msg(self, msg, msg_flags, msg_uid, folder):
161
        """Upload new message and remove the old one."""
162
163
        # Only upload in non-readonly mode
164
        if self.args.read_only:
165
            logging.debug('    Replacing\t: skipped (read-only)')
166
            return
167
168
        # Upload new message
169
        status, data = self.upload(msg, msg_flags)
170
171
        # Delete old message
172
        if status == 'OK' and self.args.read_only is False:
173
            result = self.imap.select(folder, readonly=self.args.read_only)
174
            assert result[0] == 'OK'
175
            result = self.imap.uid('STORE', msg_uid, '+FLAGS', '\\Deleted')
176
            logging.debug('    Deleting\t: %s', result)
177
            # GMail needs special treatment
178
            try:
179
                self.imap.uid('STORE', msg_uid, '+X-GM-LABELS', '\\Trash')
180
            except imaplib.IMAP4.error:
181
                pass
182
            # Sometimes expunge just fails with an EOF socket error
183
            try:
184
                self.imap.expunge()
185
                logging.debug('    Comment\t: Expunged')
186
            except imaplib.IMAP4.abort:
187
                pass
188
        else:
189
            logging.warning('    Result\t: %s (%s)', status, data)
190
191
    def upload(self, msg, msg_flags='\\Seen'):
192
        """Upload message to server."""
193
194
        # Knowing what's going on
195
        msg_date = self.convert_date(msg.get('date'))
196
        msg_subject = self.message.get_subject(msg)
197
        msg_uid = self.message.get_uid(msg)
198
        if self.args.read_only:
199
            logging.warning('    Uploading\t: skipped (read-only)')
200
            return ('Read Only', '')
201
202
        logging.debug('    Uploading\t: %s / %s', msg_date, msg_flags)
203
204
        # Check cache
205
        msg_uid = self.message.get_uid(msg)
206
        if msg_uid in self.cache:
207
            logging.warning('    Cache\t: OK')
208
            return ('Cached', '')
209
210
        # Check for duplicates
211
        if self.does_msg_exist(msg) is True:
212
            self.cache[msg_uid] = msg_subject
213
            return ('Duplicate', '')
214
215
        status, data = self.imap.append(
216
            self.args.folder, msg_flags, msg_date, msg.as_string().encode())
217
        if status == "OK":
218
            logging.warning('    Success\t: %s', status)
219
            self.cache[msg_uid] = msg_subject
220
        else:
221
            logging.warning('    Return\t\t: %s, %s', status, data)
222
223
        return status, data
224
225
    def get_msg(self, uid):
226
        """Fetch an email from the IMAP server."""
227
228
        # Sometimes IMAP servers might return empty bodies, so try again
229
        for _ in range(self.__RETRIES):
230
            try:
231
                result, data = self.imap.uid('fetch', uid,
232
                                             '(UID BODY.PEEK[] FLAGS)')
233
                if data is None or data[0] is None:
234
                    logging.warning('  Error\t: '
235
                                    'Could not get a message body. '
236
                                    'Retrying in a few seconds...')
237
                    time.sleep(2)
238
                    raise imaplib.IMAP4.error('Could not get a message body')
239
240
                body = data[0][1]
241
                logging.debug('  Result (Size)\t: %s (%d KB)',
242
                              result, len(body) / 1024)
243
244
                msg = self.get_msg_from_struct(data)
245
                msg_flags = self.get_flags_from_struct(data)
246
247
                logging.debug('  Flags\t\t: %s', msg_flags)
248
249
                return (msg, msg_flags)
250
            except imaplib.IMAP4.error:
251
                continue
252
            break
253
        else:
254
            raise imaplib.IMAP4.error('Could not get a message subject')
255
256
    def get_msgs_from_folder(self, folder):
257
        """Get all emails from a folder on the IMAP server."""
258
259
        # Safety net: enable read-only if requested
260
        self.imap.select(folder, readonly=self.args.read_only)
261
262
        # Extract email UIDs
263
        result_mails, data_mails = self.imap.uid('search', None, "ALL")
264
        msg_uids = data_mails[0].split()
265
        logging.warning('Mails (#)\t: %s (%s)',
266
                        result_mails, len(msg_uids))
267
268
        return msg_uids
269
270
    def get_folders(self) -> typing.List[str]:
271
        """Get the folders from the IMAP server to iterate through."""
272
273
        res, folder_list = self.imap.list()
274
275
        logging.warning('Folders (#)\t: %s (%s)', res, len(folder_list))
276
277
        if not self.args.all:
278
            folders = [self.args.folder]
279
        else:
280
            folders = [item.decode().split('"/"')[-1].strip()
281
                       for item in folder_list]
282
283
            folders[:] = [item for item in folders
284
                          if not item.startswith(self.__IGNORE_PREFIX)]
285
            folders[:] = [item for item in folders
286
                          if not item.startswith(self.__IGNORE_PREFIX)]
287
288
        return folders
289
290
    @staticmethod
291
    def convert_date(date):
292
        """Convert dates to copy old date to new message."""
293
294
        pz_time = email.utils.parsedate_tz(date)
295
        stamp = email.utils.mktime_tz(pz_time)
296
        date = imaplib.Time2Internaldate(stamp)
297
        return date
298
299
    @staticmethod
300
    def get_msg_from_struct(data) -> str:
301
        """Convert message to a string."""
302
303
        try:
304
            raw_email = (data[0][1]).decode('utf-8')
305
        except ValueError:
306
            try:
307
                raw_email = (data[0][1]).decode('iso-8859-1')
308
            except ValueError:
309
                raw_email = (data[0][1]).decode('utf-8', 'backslashreplace')
310
311
        return email.message_from_string(raw_email)
312
313
    @staticmethod
314
    def get_flags_from_struct(data):
315
        """Get flags to copy old flags to new message."""
316
317
        flags = imaplib.ParseFlags(data[1])
318
        flags = b" ".join(flags) if flags != () else b""
319
        flags = flags.decode("utf-8")
320
        flags = flags.replace("\\Recent", "")  # read-only attribute
321
        return flags.strip()
322
323
    def _load_cache(self):
324
        """Load cache of processed mail UIDs with their subjects."""
325
326
        # Create new cache if needed
327
        if not os.path.exists(self.cache_file) or\
328
           self.args.reset_cache:
329
            self._save_cache()
330
331
        with open(self.cache_file, 'rb') as filepointer:
332
            self.cache = pickle.load(filepointer)
333
334
    def _save_cache(self):
335
        """Save cache of processed mail UIDs with their subjects."""
336
337
        with open(self.cache_file, 'wb+') as filepointer:
338
            pickle.dump(self.cache, filepointer, pickle.HIGHEST_PROTOCOL)
339