email_fetcher   F
last analyzed

Complexity

Total Complexity 74

Size/Duplication

Total Lines 531
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 74
eloc 361
dl 0
loc 531
rs 2.48
c 0
b 0
f 0

22 Methods

Rating   Name   Duplication   Size   Complexity  
B DecodedMail._get_mime_body_message() 0 18 7
A DecodedMail.get_first_ref() 0 2 1
B DecodedMail.get_body() 0 27 7
A DecodedMail.__init__() 0 3 1
A DecodedMail.get_key() 0 19 4
A MessageContainer.__init__() 0 3 1
A DecodedMail.find_key_from_mail_address() 0 16 2
A DecodedMail.get_subject() 0 2 1
A DecodedMail._decode_header() 0 6 2
A DecodedMail.get_to_address() 0 2 1
A DecodedMail.get_from_address() 0 2 1
A DecodedMail.get_special_key() 0 2 1
A MailFetcher.__init__() 0 56 1
A MailFetcher.stop() 0 2 1
B MailFetcher._notify_tracim() 0 48 7
A MailFetcher._get_content_info() 0 15 2
A MailFetcher._create_comment_request() 0 17 1
A MailFetcher._check_mail() 0 8 2
A MailFetcher._send_request() 0 36 4
A MailFetcher._get_auth_headers() 0 4 1
F MailFetcher.run() 0 129 23
A MailFetcher._fetch() 0 40 3

How to fix   Complexity   

Complexity

Complex classes like email_fetcher often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
import json
4
import socket
5
import ssl
6
import time
7
import typing
8
from email import message_from_bytes
9
from email.header import decode_header
10
from email.header import make_header
11
from email.message import Message
12
from email.utils import parseaddr
13
14
import filelock
15
import imapclient
16
import markdown
17
import requests
18
from email_reply_parser import EmailReplyParser
19
20
from tracim_backend.exceptions import BadStatusCode
21
from tracim_backend.exceptions import EmptyEmailBody
22
from tracim_backend.exceptions import NoSpecialKeyFound
23
from tracim_backend.exceptions import UnsupportedRequestMethod
24
from tracim_backend.lib.mail_fetcher.email_processing.parser import ParsedHTMLMail  # nopep8
25
from tracim_backend.lib.mail_fetcher.email_processing.sanitizer import HtmlSanitizer  # nopep8
26
from tracim_backend.lib.utils.authentification import TRACIM_API_KEY_HEADER
27
from tracim_backend.lib.utils.authentification import TRACIM_API_USER_EMAIL_LOGIN_HEADER  # nopep8
28
from tracim_backend.lib.utils.logger import logger
29
30
TRACIM_SPECIAL_KEY_HEADER = 'X-Tracim-Key'
31
CONTENT_TYPE_TEXT_PLAIN = 'text/plain'
32
CONTENT_TYPE_TEXT_HTML = 'text/html'
33
34
IMAP_CHECKED_FLAG = imapclient.FLAGGED
35
IMAP_SEEN_FLAG = imapclient.SEEN
36
37
MAIL_FETCHER_FILELOCK_TIMEOUT = 10
38
MAIL_FETCHER_CONNECTION_TIMEOUT = 60*3
39
MAIL_FETCHER_IDLE_RESPONSE_TIMEOUT = 60*9   # this should be not more
40
# that 29 minutes according to rfc2177.(server wait 30min by default)
41
42
43
class MessageContainer(object):
44
    def __init__(self, message: Message, uid: int) -> None:
45
        self.message = message
46
        self.uid = uid
47
48
49
class DecodedMail(object):
50
    def __init__(self, message: Message, uid: int=None) -> None:
51
        self._message = message
52
        self.uid = uid
53
54
    def _decode_header(self, header_title: str) -> typing.Optional[str]:
55
        # FIXME : Handle exception
56
        if header_title in self._message:
57
            return str(make_header(decode_header(self._message[header_title])))
58
        else:
59
            return None
60
61
    def get_subject(self) -> typing.Optional[str]:
62
        return self._decode_header('subject')
63
64
    def get_from_address(self) -> str:
65
        return parseaddr(self._message['From'])[1]
66
67
    def get_to_address(self) -> str:
68
        return parseaddr(self._message['To'])[1]
69
70
    def get_first_ref(self) -> str:
71
        return parseaddr(self._message['References'])[1]
72
73
    def get_special_key(self) -> typing.Optional[str]:
74
        return self._decode_header(TRACIM_SPECIAL_KEY_HEADER)
75
76
    def get_body(
77
            self,
78
            use_html_parsing=True,
79
            use_txt_parsing=True,
80
    ) -> typing.Optional[str]:
81
        body_part = self._get_mime_body_message()
82
        body = None
83
        if body_part:
84
            charset = body_part.get_content_charset('iso-8859-1')
85
            content_type = body_part.get_content_type()
86
            if content_type == CONTENT_TYPE_TEXT_PLAIN:
87
                txt_body = body_part.get_payload(decode=True).decode(
88
                    charset)
89
                if use_txt_parsing:
90
                    txt_body = EmailReplyParser.parse_reply(txt_body)
91
                html_body = markdown.markdown(txt_body)
92
                body = HtmlSanitizer.sanitize(html_body)
93
94
            elif content_type == CONTENT_TYPE_TEXT_HTML:
95
                html_body = body_part.get_payload(decode=True).decode(
96
                    charset)
97
                if use_html_parsing:
98
                    html_body = str(ParsedHTMLMail(html_body))
99
                body = HtmlSanitizer.sanitize(html_body)
100
            if not body:
101
                raise EmptyEmailBody()
102
        return body
103
104
    def _get_mime_body_message(self) -> typing.Optional[Message]:
105
        # TODO - G.M - 2017-11-16 - Use stdlib msg.get_body feature for py3.6+
106
        part = None
107
        # Check for html
108
        for part in self._message.walk():
109
            content_type = part.get_content_type()
110
            content_dispo = str(part.get('Content-Disposition'))
111
            if content_type == CONTENT_TYPE_TEXT_HTML \
112
                    and 'attachment' not in content_dispo:
113
                return part
114
        # check for plain text
115
        for part in self._message.walk():
116
            content_type = part.get_content_type()
117
            content_dispo = str(part.get('Content-Disposition'))
118
            if content_type == CONTENT_TYPE_TEXT_PLAIN \
119
                    and 'attachment' not in content_dispo:
120
                return part
121
        return part
122
123
    def get_key(self) -> typing.Optional[str]:
124
125
        """
126
        key is the string contain in some mail header we need to retrieve.
127
        First try checking special header, them check 'to' header
128
        and finally check first(oldest) mail-id of 'references' header
129
        """
130
        first_ref = self.get_first_ref()
131
        to_address = self.get_to_address()
132
        special_key = self.get_special_key()
133
134
        if special_key:
135
            return special_key
136
        if to_address:
137
            return DecodedMail.find_key_from_mail_address(to_address)
138
        if first_ref:
139
            return DecodedMail.find_key_from_mail_address(first_ref)
140
141
        raise NoSpecialKeyFound()
142
143
    @classmethod
144
    def find_key_from_mail_address(
145
        cls,
146
        mail_address: str,
147
    ) -> typing.Optional[str]:
148
        """ Parse mail_adress-like string
149
        to retrieve key.
150
151
        :param mail_address: user+key@something like string
152
        :return: key
153
        """
154
        username = mail_address.split('@')[0]
155
        username_data = username.split('+')
156
        if len(username_data) == 2:
157
            return username_data[1]
158
        return None
159
160
161
class BadIMAPFetchResponse(Exception):
162
    pass
163
164
165
class MailFetcher(object):
166
    def __init__(
167
        self,
168
        host: str,
169
        port: str,
170
        user: str,
171
        password: str,
172
        use_ssl: bool,
173
        folder: str,
174
        use_idle: bool,
175
        connection_max_lifetime: int,
176
        heartbeat: int,
177
        api_base_url: str,
178
        api_key: str,
179
        use_html_parsing: bool,
180
        use_txt_parsing: bool,
181
        lockfile_path: str,
182
        burst: bool,
183
    ) -> None:
184
        """
185
        Fetch mail from a mailbox folder through IMAP and add their content to
186
        Tracim through http according to mail Headers.
187
        Fetch is regular.
188
        :param host: imap server hostname
189
        :param port: imap connection port
190
        :param user: user login of mailbox
191
        :param password: user password of mailbox
192
        :param use_ssl: use imap over ssl connection
193
        :param folder: mail folder where new mail are fetched
194
        :param use_idle: use IMAP IDLE(server notification) when available
195
        :param heartbeat: seconds to wait before fetching new mail again
196
        :param connection_max_lifetime: maximum duration allowed for a
197
             connection . connection are automatically renew when their
198
             lifetime excess this duration.
199
        :param api_base_url: url to get access to tracim api
200
        :param api_key: tracim api key
201
        :param use_html_parsing: parse html mail
202
        :param use_txt_parsing: parse txt mail
203
        :param burst: if true, run only one time,
204
        if false run as continous daemon.
205
        """
206
        self.host = host
207
        self.port = port
208
        self.user = user
209
        self.password = password
210
        self.use_ssl = use_ssl
211
        self.folder = folder
212
        self.heartbeat = heartbeat
213
        self.use_idle = use_idle
214
        self.connection_max_lifetime = connection_max_lifetime
215
        self.api_base_url = api_base_url
216
        self.api_key = api_key
217
        self.use_html_parsing = use_html_parsing
218
        self.use_txt_parsing = use_txt_parsing
219
        self.lock = filelock.FileLock(lockfile_path)
220
        self._is_active = True
221
        self.burst = burst
222
223
    def run(self) -> None:
224
        logger.info(self, 'Starting MailFetcher')
225
        while self._is_active:
226
            imapc = None
227
            sleep_after_connection = True
228
            try:
229
                imapc = imapclient.IMAPClient(
230
                    self.host,
231
                    self.port,
232
                    ssl=self.use_ssl,
233
                    timeout=MAIL_FETCHER_CONNECTION_TIMEOUT
234
                )
235
                imapc.login(self.user, self.password)
236
237
                logger.debug(self, 'Select folder {}'.format(
238
                    self.folder,
239
                ))
240
                imapc.select_folder(self.folder)
241
242
                # force renew connection when deadline is reached
243
                deadline = time.time() + self.connection_max_lifetime
244
                while True:
245
                    if not self._is_active:
246
                        logger.warning(self, 'Mail Fetcher process aborted')
247
                        sleep_after_connection = False
248
                        break
249
250
                    if time.time() > deadline:
251
                        logger.debug(
252
                            self,
253
                            "MailFetcher Connection Lifetime limit excess"
254
                            ", Try Re-new connection")
255
                        sleep_after_connection = False
256
                        break
257
258
                    # check for new mails
259
                    self._check_mail(imapc)
260
261
                    if self.use_idle and imapc.has_capability('IDLE'):
262
                        # IDLE_mode wait until event from server
263
                        logger.debug(self, 'wail for event(IDLE)')
264
                        imapc.idle()
265
                        imapc.idle_check(
266
                            timeout=MAIL_FETCHER_IDLE_RESPONSE_TIMEOUT
267
                        )
268
                        imapc.idle_done()
269
                    else:
270
                        if self.use_idle and not imapc.has_capability('IDLE'):
271
                            log = 'IDLE mode activated but server do not' \
272
                                  'support it, use polling instead.'
273
                            logger.warning(self, log)
274
275
                        if self.burst:
276
                            self.stop()
277
                            break
278
                        # normal polling mode : sleep a define duration
279
                        logger.debug(self,
280
                                     'sleep for {}'.format(self.heartbeat))
281
                        time.sleep(self.heartbeat)
282
283
                    if self.burst:
284
                        self.stop()
285
                        break
286
            # Socket
287
            except (socket.error,
288
                    socket.gaierror,
289
                    socket.herror) as e:
290
                log = 'Socket fail with IMAP connection {}'
291
                logger.error(self, log.format(e.__str__()))
292
293
            except socket.timeout as e:
294
                log = 'Socket timeout on IMAP connection {}'
295
                logger.error(self, log.format(e.__str__()))
296
297
            # SSL
298
            except ssl.SSLError as e:
299
                log = 'SSL error on IMAP connection'
300
                logger.error(self, log.format(e.__str__()))
301
302
            except ssl.CertificateError as e:
303
                log = 'SSL Certificate verification failed on IMAP connection'
304
                logger.error(self, log.format(e.__str__()))
305
306
            # Filelock
307
            except filelock.Timeout as e:
308
                log = 'Mail Fetcher Lock Timeout {}'
309
                logger.warning(self, log.format(e.__str__()))
310
311
            # IMAP
312
            # TODO - G.M - 10-01-2017 - Support imapclient exceptions
313
            # when Imapclient stable will be 2.0+
314
315
            except BadIMAPFetchResponse as e:
316
                log = 'Imap Fetch command return bad response.' \
317
                      'Is someone else connected to the mailbox ?: ' \
318
                      '{}'
319
                logger.error(self, log.format(e.__str__()))
320
            # Others
321
            except Exception as e:
322
                log = 'Mail Fetcher error {}'
323
                logger.error(self, log.format(e.__str__()))
324
325
            finally:
326
                # INFO - G.M - 2018-01-09 - Connection closing
327
                # Properly close connection according to
328
                # https://github.com/mjs/imapclient/pull/279/commits/043e4bd0c5c775c5a08cb5f1baa93876a46732ee
329
                # TODO : Use __exit__ method instead when imapclient stable will
330
                # be 2.0+ .
331
                if imapc:
332
                    logger.debug(self, 'Try logout')
333
                    try:
334
                        imapc.logout()
335
                    except Exception:
336
                        try:
337
                            imapc.shutdown()
338
                        except Exception as e:
339
                            log = "Can't logout, connection broken ? {}"
340
                            logger.error(self, log.format(e.__str__()))
341
342
            if self.burst:
343
                self.stop()
344
                break
345
346
            if sleep_after_connection:
347
                logger.debug(self, 'sleep for {}'.format(self.heartbeat))
348
                time.sleep(self.heartbeat)
349
350
        log = 'Mail Fetcher stopped'
351
        logger.debug(self, log)
352
353
    def _check_mail(self, imapc: imapclient.IMAPClient) -> None:
354
        with self.lock.acquire(
355
                timeout=MAIL_FETCHER_FILELOCK_TIMEOUT
356
        ):
357
            messages = self._fetch(imapc)
358
            cleaned_mails = [DecodedMail(m.message, m.uid)
359
                             for m in messages]
360
            self._notify_tracim(cleaned_mails, imapc)
361
362
    def stop(self) -> None:
363
        self._is_active = False
364
365
    def _fetch(
366
        self, 
367
        imapc: imapclient.IMAPClient,
368
    ) -> typing.List[MessageContainer]:
369
        """
370
        Get news message from mailbox
371
        :return: list of new mails
372
        """
373
        messages = []
374
375
        logger.debug(self, 'Fetch unflagged messages')
376
        uids = imapc.search(['UNFLAGGED'])
377
        logger.debug(self, 'Found {} unflagged mails'.format(
378
            len(uids),
379
        ))
380
        for msgid, data in imapc.fetch(uids, ['BODY.PEEK[]']).items():
381
            # INFO - G.M - 2017-12-08 - Fetch BODY.PEEK[]
382
            # Retrieve all mail(body and header) but don't set mail
383
            # as seen because of PEEK
384
            # see rfc3501
385
            logger.debug(self, 'Fetch mail "{}"'.format(
386
                msgid,
387
            ))
388
389
            try:
390
                msg = message_from_bytes(data[b'BODY[]'])
391
            except KeyError as e:
392
                # INFO - G.M - 12-01-2018 - Fetch may return events response
393
                # In some specific case, fetch command may return events
394
                # response unrelated to fetch request.
395
                # This should happen only when someone-else use the mailbox
396
                # at the same time of the fetcher.
397
                # see https://github.com/mjs/imapclient/issues/334
398
                except_msg = 'fetch response : {}'.format(str(data))
399
                raise BadIMAPFetchResponse(except_msg) from e
400
401
            msg_container = MessageContainer(msg, msgid)
402
            messages.append(msg_container)
403
404
        return messages
405
406
    def _notify_tracim(
407
        self,
408
        mails: typing.List[DecodedMail],
409
        imapc: imapclient.IMAPClient
410
    ) -> None:
411
        """
412
        Send http request to tracim endpoint
413
        :param mails: list of mails to send
414
        :return: none
415
        """
416
        logger.debug(self, 'Notify tracim about {} new responses'.format(
417
            len(mails),
418
        ))
419
        # TODO BS 20171124: Look around mail.get_from_address(), mail.get_key()
420
        # , mail.get_body() etc ... for raise InvalidEmailError if missing
421
        #  required informations (actually get_from_address raise IndexError
422
        #  if no from address for example) and catch it here
423
        while mails:
424
            mail = mails.pop()
425
            try:
426
                method, endpoint, json_body_dict = self._create_comment_request(mail)  # nopep8
427
            except NoSpecialKeyFound as exc:
428
                log = 'Failed to create comment request due to missing specialkey in mail {}'  # nopep8
429
                logger.error(self, log.format(exc.__str__()))
430
                continue
431
            except EmptyEmailBody as exc:
432
                log = 'Empty body, skip mail'
433
                logger.error(self, log)
434
                continue
435
            except Exception as exc:
436
                log = 'Failed to create comment request in mail fetcher error {}'  # nopep8
437
                logger.error(self, log.format(exc.__str__()))
438
                continue
439
440
            try:
441
                self._send_request(
442
                    mail=mail,
443
                    imapc=imapc,
444
                    method=method,
445
                    endpoint=endpoint,
446
                    json_body_dict=json_body_dict,
447
                )
448
            except requests.exceptions.Timeout as e:
449
                log = 'Timeout error to transmit fetched mail to tracim : {}'
450
                logger.error(self, log.format(str(e)))
451
            except requests.exceptions.RequestException as e:
452
                log = 'Fail to transmit fetched mail to tracim : {}'
453
                logger.error(self, log.format(str(e)))
454
455
    def _get_auth_headers(self, user_email) -> dict:
456
        return {
457
            TRACIM_API_KEY_HEADER: self.api_key,
458
            TRACIM_API_USER_EMAIL_LOGIN_HEADER: user_email
459
        }
460
461
    def _get_content_info(self, content_id, user_email):
462
        endpoint = '{api_base_url}contents/{content_id}'.format(
463
            api_base_url=self.api_base_url,
464
            content_id=content_id,
465
        )
466
        result = requests.get(
467
            endpoint,
468
            headers=self._get_auth_headers(user_email)
469
        )
470
        if result.status_code not in [200, 204]:
471
            details = result.json().get('message')
472
            msg = 'bad status code {}(200 is valid) response when trying to get info about a content: {}'  # nopep8
473
            msg = msg.format(str(result.status_code), details)
474
            raise BadStatusCode(msg)
475
        return result.json()
476
477
    def _create_comment_request(self, mail: DecodedMail) -> typing.Tuple[str, str, dict]:  # nopep8
478
        content_id = mail.get_key()
479
        content_info = self._get_content_info(content_id, mail.get_from_address())  # nopep8
480
        mail_body = mail.get_body(
481
            use_html_parsing=self.use_html_parsing,
482
            use_txt_parsing=self.use_txt_parsing,
483
        )
484
        endpoint = '{api_base_url}workspaces/{workspace_id}/contents/{content_id}/comments'.format(  # nopep8
485
            api_base_url=self.api_base_url,
486
            content_id=content_id,
487
            workspace_id=content_info['workspace_id']
488
        )
489
        method = 'POST'
490
        body = {
491
            'raw_content': mail_body
492
        }
493
        return method, endpoint, body
494
495
    def _send_request(
496
            self,
497
            mail: DecodedMail,
498
            imapc: imapclient.IMAPClient,
499
            method: str,
500
            endpoint: str,
501
            json_body_dict: dict
502
    ):
503
        logger.debug(
504
            self,
505
            'Contact API on {endpoint} with method {method} with body {body}'.format(   # nopep8
506
                endpoint=endpoint,
507
                method=method,
508
                body=str(json_body_dict),
509
            ),
510
        )
511
        if method == 'POST':
512
            request_method = requests.post
513
        else:
514
            # TODO - G.M - 2018-08-24 - Better handling exception
515
            raise UnsupportedRequestMethod('Request method not supported')
516
517
        r = request_method(
518
            url=endpoint,
519
            json=json_body_dict,
520
            headers=self._get_auth_headers(mail.get_from_address()),
521
        )
522
        if r.status_code not in [200, 204]:
523
            details = r.json().get('message')
524
            msg = 'bad status code {} (200 and 204 are valid) response when sending mail to tracim: {}'  # nopep8
525
            msg = msg.format(str(r.status_code), details)
526
            raise BadStatusCode(msg)
527
        # Flag all correctly checked mail
528
        if r.status_code in [200, 204]:
529
            imapc.add_flags((mail.uid,), IMAP_CHECKED_FLAG)
530
            imapc.add_flags((mail.uid,), IMAP_SEEN_FLAG)
531