Passed
Pull Request — develop (#31)
by Bastien
01:43
created

email_fetcher.DecodedMail.get_to_address()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
3
import json
4
import socket
5
import ssl
6
import time
7
import typing
8
from email import message_from_bytes
9
from email.header import decode_header
10
from email.header import make_header
11
from email.message import Message
12
from email.utils import parseaddr
13
14
import filelock
15
import imapclient
16
import markdown
17
import requests
18
from email_reply_parser import EmailReplyParser
19
20
from tracim_backend.exceptions import BadStatusCode
21
from tracim_backend.exceptions import EmptyEmailBody
22
from tracim_backend.exceptions import NoSpecialKeyFound
23
from tracim_backend.exceptions import UnsupportedRequestMethod
24
from tracim_backend.lib.mail_fetcher.email_processing.parser import ParsedHTMLMail  # nopep8
25
from tracim_backend.lib.mail_fetcher.email_processing.sanitizer import HtmlSanitizer  # nopep8
26
from tracim_backend.lib.utils.authentification import TRACIM_API_KEY_HEADER
27
from tracim_backend.lib.utils.authentification import TRACIM_API_USER_EMAIL_LOGIN_HEADER  # nopep8
28
from tracim_backend.lib.utils.logger import logger
29
30
TRACIM_SPECIAL_KEY_HEADER = 'X-Tracim-Key'
31
CONTENT_TYPE_TEXT_PLAIN = 'text/plain'
32
CONTENT_TYPE_TEXT_HTML = 'text/html'
33
34
IMAP_CHECKED_FLAG = imapclient.FLAGGED
35
IMAP_SEEN_FLAG = imapclient.SEEN
36
37
MAIL_FETCHER_FILELOCK_TIMEOUT = 10
38
MAIL_FETCHER_CONNECTION_TIMEOUT = 60*3
39
MAIL_FETCHER_IDLE_RESPONSE_TIMEOUT = 60*9   # this should be not more
40
# that 29 minutes according to rfc2177.(server wait 30min by default)
41
42
43
class MessageContainer(object):
44
    def __init__(self, message: Message, uid: int) -> None:
45
        self.message = message
46
        self.uid = uid
47
48
49
class DecodedMail(object):
50
    def __init__(self, message: Message, uid: int=None) -> None:
51
        self._message = message
52
        self.uid = uid
53
54
    def _decode_header(self, header_title: str) -> typing.Optional[str]:
55
        # FIXME : Handle exception
56
        if header_title in self._message:
57
            return str(make_header(decode_header(self._message[header_title])))
58
        else:
59
            return None
60
61
    def get_subject(self) -> typing.Optional[str]:
62
        return self._decode_header('subject')
63
64
    def get_from_address(self) -> str:
65
        return parseaddr(self._message['From'])[1]
66
67
    def get_to_address(self) -> str:
68
        return parseaddr(self._message['To'])[1]
69
70
    def get_first_ref(self) -> str:
71
        return parseaddr(self._message['References'])[1]
72
73
    def get_special_key(self) -> typing.Optional[str]:
74
        return self._decode_header(TRACIM_SPECIAL_KEY_HEADER)
75
76
    def get_body(
77
            self,
78
            use_html_parsing=True,
79
            use_txt_parsing=True,
80
    ) -> typing.Optional[str]:
81
        body_part = self._get_mime_body_message()
82
        body = None
83
        if body_part:
84
            charset = body_part.get_content_charset('iso-8859-1')
85
            content_type = body_part.get_content_type()
86
            if content_type == CONTENT_TYPE_TEXT_PLAIN:
87
                txt_body = body_part.get_payload(decode=True).decode(
88
                    charset)
89
                if use_txt_parsing:
90
                    txt_body = EmailReplyParser.parse_reply(txt_body)
91
                html_body = markdown.markdown(txt_body)
92
                body = HtmlSanitizer.sanitize(html_body)
93
94
            elif content_type == CONTENT_TYPE_TEXT_HTML:
95
                html_body = body_part.get_payload(decode=True).decode(
96
                    charset)
97
                if use_html_parsing:
98
                    html_body = str(ParsedHTMLMail(html_body))
99
                body = HtmlSanitizer.sanitize(html_body)
100
            if not body:
101
                raise EmptyEmailBody()
102
        return body
103
104
    def _get_mime_body_message(self) -> typing.Optional[Message]:
105
        # TODO - G.M - 2017-11-16 - Use stdlib msg.get_body feature for py3.6+
106
        part = None
107
        # Check for html
108
        for part in self._message.walk():
109
            content_type = part.get_content_type()
110
            content_dispo = str(part.get('Content-Disposition'))
111
            if content_type == CONTENT_TYPE_TEXT_HTML \
112
                    and 'attachment' not in content_dispo:
113
                return part
114
        # check for plain text
115
        for part in self._message.walk():
116
            content_type = part.get_content_type()
117
            content_dispo = str(part.get('Content-Disposition'))
118
            if content_type == CONTENT_TYPE_TEXT_PLAIN \
119
                    and 'attachment' not in content_dispo:
120
                return part
121
        return part
122
123
    def get_key(self) -> typing.Optional[str]:
124
125
        """
126
        key is the string contain in some mail header we need to retrieve.
127
        First try checking special header, them check 'to' header
128
        and finally check first(oldest) mail-id of 'references' header
129
        """
130
        first_ref = self.get_first_ref()
131
        to_address = self.get_to_address()
132
        special_key = self.get_special_key()
133
134
        if special_key:
135
            return special_key
136
        if to_address:
137
            return DecodedMail.find_key_from_mail_address(to_address)
138
        if first_ref:
139
            return DecodedMail.find_key_from_mail_address(first_ref)
140
141
        raise NoSpecialKeyFound()
142
143
    @classmethod
144
    def find_key_from_mail_address(
145
        cls,
146
        mail_address: str,
147
    ) -> typing.Optional[str]:
148
        """ Parse mail_adress-like string
149
        to retrieve key.
150
151
        :param mail_address: user+key@something like string
152
        :return: key
153
        """
154
        username = mail_address.split('@')[0]
155
        username_data = username.split('+')
156
        if len(username_data) == 2:
157
            return username_data[1]
158
        return None
159
160
161
class BadIMAPFetchResponse(Exception):
162
    pass
163
164
165
class MailFetcher(object):
166
    def __init__(
167
        self,
168
        host: str,
169
        port: str,
170
        user: str,
171
        password: str,
172
        use_ssl: bool,
173
        folder: str,
174
        use_idle: bool,
175
        connection_max_lifetime: int,
176
        heartbeat: int,
177
        api_base_url: str,
178
        api_key: str,
179
        use_html_parsing: bool,
180
        use_txt_parsing: bool,
181
        lockfile_path: str,
182
        burst: bool,
183
    ) -> None:
184
        """
185
        Fetch mail from a mailbox folder through IMAP and add their content to
186
        Tracim through http according to mail Headers.
187
        Fetch is regular.
188
        :param host: imap server hostname
189
        :param port: imap connection port
190
        :param user: user login of mailbox
191
        :param password: user password of mailbox
192
        :param use_ssl: use imap over ssl connection
193
        :param folder: mail folder where new mail are fetched
194
        :param use_idle: use IMAP IDLE(server notification) when available
195
        :param heartbeat: seconds to wait before fetching new mail again
196
        :param connection_max_lifetime: maximum duration allowed for a
197
             connection . connection are automatically renew when their
198
             lifetime excess this duration.
199
        :param api_base_url: url to get access to tracim api
200
        :param api_key: tracim api key
201
        :param use_html_parsing: parse html mail
202
        :param use_txt_parsing: parse txt mail
203
        """
204
        self.host = host
205
        self.port = port
206
        self.user = user
207
        self.password = password
208
        self.use_ssl = use_ssl
209
        self.folder = folder
210
        self.heartbeat = heartbeat
211
        self.use_idle = use_idle
212
        self.connection_max_lifetime = connection_max_lifetime
213
        self.api_base_url = api_base_url
214
        self.api_key = api_key
215
        self.use_html_parsing = use_html_parsing
216
        self.use_txt_parsing = use_txt_parsing
217
        self.lock = filelock.FileLock(lockfile_path)
218
        self._is_active = True
219
        self.burst = burst
220
221
    def run(self) -> None:
222
        logger.info(self, 'Starting MailFetcher')
223
        while self._is_active:
224
            imapc = None
225
            sleep_after_connection = True
226
            try:
227
                imapc = imapclient.IMAPClient(
228
                    self.host,
229
                    self.port,
230
                    ssl=self.use_ssl,
231
                    timeout=MAIL_FETCHER_CONNECTION_TIMEOUT
232
                )
233
                imapc.login(self.user, self.password)
234
235
                logger.debug(self, 'Select folder {}'.format(
236
                    self.folder,
237
                ))
238
                imapc.select_folder(self.folder)
239
240
                # force renew connection when deadline is reached
241
                deadline = time.time() + self.connection_max_lifetime
242
                while True:
243
                    if not self._is_active:
244
                        logger.warning(self, 'Mail Fetcher process aborted')
245
                        sleep_after_connection = False
246
                        break
247
248
                    if time.time() > deadline:
249
                        logger.debug(
250
                            self,
251
                            "MailFetcher Connection Lifetime limit excess"
252
                            ", Try Re-new connection")
253
                        sleep_after_connection = False
254
                        break
255
256
                    # check for new mails
257
                    self._check_mail(imapc)
258
259
                    if self.use_idle and imapc.has_capability('IDLE'):
260
                        # IDLE_mode wait until event from server
261
                        logger.debug(self, 'wail for event(IDLE)')
262
                        imapc.idle()
263
                        imapc.idle_check(
264
                            timeout=MAIL_FETCHER_IDLE_RESPONSE_TIMEOUT
265
                        )
266
                        imapc.idle_done()
267
                    else:
268
                        if self.use_idle and not imapc.has_capability('IDLE'):
269
                            log = 'IDLE mode activated but server do not' \
270
                                  'support it, use polling instead.'
271
                            logger.warning(self, log)
272
273
                        if self.burst:
274
                            self.stop()
275
                            break
276
                        # normal polling mode : sleep a define duration
277
                        logger.debug(self,
278
                                     'sleep for {}'.format(self.heartbeat))
279
                        time.sleep(self.heartbeat)
280
281
                    if self.burst:
282
                        self.stop()
283
                        break
284
            # Socket
285
            except (socket.error,
286
                    socket.gaierror,
287
                    socket.herror) as e:
288
                log = 'Socket fail with IMAP connection {}'
289
                logger.error(self, log.format(e.__str__()))
290
291
            except socket.timeout as e:
292
                log = 'Socket timeout on IMAP connection {}'
293
                logger.error(self, log.format(e.__str__()))
294
295
            # SSL
296
            except ssl.SSLError as e:
297
                log = 'SSL error on IMAP connection'
298
                logger.error(self, log.format(e.__str__()))
299
300
            except ssl.CertificateError as e:
301
                log = 'SSL Certificate verification failed on IMAP connection'
302
                logger.error(self, log.format(e.__str__()))
303
304
            # Filelock
305
            except filelock.Timeout as e:
306
                log = 'Mail Fetcher Lock Timeout {}'
307
                logger.warning(self, log.format(e.__str__()))
308
309
            # IMAP
310
            # TODO - G.M - 10-01-2017 - Support imapclient exceptions
311
            # when Imapclient stable will be 2.0+
312
313
            except BadIMAPFetchResponse as e:
314
                log = 'Imap Fetch command return bad response.' \
315
                      'Is someone else connected to the mailbox ?: ' \
316
                      '{}'
317
                logger.error(self, log.format(e.__str__()))
318
            # Others
319
            except Exception as e:
320
                log = 'Mail Fetcher error {}'
321
                logger.error(self, log.format(e.__str__()))
322
323
            finally:
324
                # INFO - G.M - 2018-01-09 - Connection closing
325
                # Properly close connection according to
326
                # https://github.com/mjs/imapclient/pull/279/commits/043e4bd0c5c775c5a08cb5f1baa93876a46732ee
327
                # TODO : Use __exit__ method instead when imapclient stable will
328
                # be 2.0+ .
329
                if imapc:
330
                    logger.debug(self, 'Try logout')
331
                    try:
332
                        imapc.logout()
333
                    except Exception:
334
                        try:
335
                            imapc.shutdown()
336
                        except Exception as e:
337
                            log = "Can't logout, connection broken ? {}"
338
                            logger.error(self, log.format(e.__str__()))
339
340
            if self.burst:
341
                self.stop()
342
                break
343
344
            if sleep_after_connection:
345
                logger.debug(self, 'sleep for {}'.format(self.heartbeat))
346
                time.sleep(self.heartbeat)
347
348
        log = 'Mail Fetcher stopped'
349
        logger.debug(self, log)
350
351
    def _check_mail(self, imapc: imapclient.IMAPClient) -> None:
352
        with self.lock.acquire(
353
                timeout=MAIL_FETCHER_FILELOCK_TIMEOUT
354
        ):
355
            messages = self._fetch(imapc)
356
            cleaned_mails = [DecodedMail(m.message, m.uid)
357
                             for m in messages]
358
            self._notify_tracim(cleaned_mails, imapc)
359
360
    def stop(self) -> None:
361
        self._is_active = False
362
363
    def _fetch(
364
        self, 
365
        imapc: imapclient.IMAPClient,
366
    ) -> typing.List[MessageContainer]:
367
        """
368
        Get news message from mailbox
369
        :return: list of new mails
370
        """
371
        messages = []
372
373
        logger.debug(self, 'Fetch unflagged messages')
374
        uids = imapc.search(['UNFLAGGED'])
375
        logger.debug(self, 'Found {} unflagged mails'.format(
376
            len(uids),
377
        ))
378
        for msgid, data in imapc.fetch(uids, ['BODY.PEEK[]']).items():
379
            # INFO - G.M - 2017-12-08 - Fetch BODY.PEEK[]
380
            # Retrieve all mail(body and header) but don't set mail
381
            # as seen because of PEEK
382
            # see rfc3501
383
            logger.debug(self, 'Fetch mail "{}"'.format(
384
                msgid,
385
            ))
386
387
            try:
388
                msg = message_from_bytes(data[b'BODY[]'])
389
            except KeyError as e:
390
                # INFO - G.M - 12-01-2018 - Fetch may return events response
391
                # In some specific case, fetch command may return events
392
                # response unrelated to fetch request.
393
                # This should happen only when someone-else use the mailbox
394
                # at the same time of the fetcher.
395
                # see https://github.com/mjs/imapclient/issues/334
396
                except_msg = 'fetch response : {}'.format(str(data))
397
                raise BadIMAPFetchResponse(except_msg) from e
398
399
            msg_container = MessageContainer(msg, msgid)
400
            messages.append(msg_container)
401
402
        return messages
403
404
    def _notify_tracim(
405
        self,
406
        mails: typing.List[DecodedMail],
407
        imapc: imapclient.IMAPClient
408
    ) -> None:
409
        """
410
        Send http request to tracim endpoint
411
        :param mails: list of mails to send
412
        :return: none
413
        """
414
        logger.debug(self, 'Notify tracim about {} new responses'.format(
415
            len(mails),
416
        ))
417
        # TODO BS 20171124: Look around mail.get_from_address(), mail.get_key()
418
        # , mail.get_body() etc ... for raise InvalidEmailError if missing
419
        #  required informations (actually get_from_address raise IndexError
420
        #  if no from address for example) and catch it here
421
        while mails:
422
            mail = mails.pop()
423
            try:
424
                method, endpoint, json_body_dict = self._create_comment_request(mail)  # nopep8
425
            except NoSpecialKeyFound as exc:
426
                log = 'Failed to create comment request due to missing specialkey in mail {}'  # nopep8
427
                logger.error(self, log.format(exc.__str__()))
428
                continue
429
            except EmptyEmailBody as exc:
430
                log = 'Empty body, skip mail'
431
                logger.error(self, log)
432
                continue
433
            except Exception as exc:
434
                log = 'Failed to create comment request in mail fetcher error {}'  # nopep8
435
                logger.error(self, log.format(exc.__str__()))
436
                continue
437
438
            try:
439
                self._send_request(
440
                    mail=mail,
441
                    imapc=imapc,
442
                    method=method,
443
                    endpoint=endpoint,
444
                    json_body_dict=json_body_dict,
445
                )
446
            except requests.exceptions.Timeout as e:
447
                log = 'Timeout error to transmit fetched mail to tracim : {}'
448
                logger.error(self, log.format(str(e)))
449
            except requests.exceptions.RequestException as e:
450
                log = 'Fail to transmit fetched mail to tracim : {}'
451
                logger.error(self, log.format(str(e)))
452
453
    def _get_auth_headers(self, user_email) -> dict:
454
        return {
455
            TRACIM_API_KEY_HEADER: self.api_key,
456
            TRACIM_API_USER_EMAIL_LOGIN_HEADER: user_email
457
        }
458
459
    def _get_content_info(self, content_id, user_email):
460
        endpoint = '{api_base_url}contents/{content_id}'.format(
461
            api_base_url=self.api_base_url,
462
            content_id=content_id,
463
        )
464
        result = requests.get(
465
            endpoint,
466
            headers=self._get_auth_headers(user_email)
467
        )
468
        if result.status_code not in [200, 204]:
469
            details = result.json().get('message')
470
            msg = 'bad status code {}(200 is valid) response when trying to get info about a content: {}'  # nopep8
471
            msg = msg.format(str(result.status_code), details)
472
            raise BadStatusCode(msg)
473
        return result.json()
474
475
    def _create_comment_request(self, mail: DecodedMail) -> typing.Tuple[str, str, dict]:  # nopep8
476
        content_id = mail.get_key()
477
        content_info = self._get_content_info(content_id, mail.get_from_address())  # nopep8
478
        mail_body = mail.get_body(
479
            use_html_parsing=self.use_html_parsing,
480
            use_txt_parsing=self.use_txt_parsing,
481
        )
482
        endpoint = '{api_base_url}workspaces/{workspace_id}/contents/{content_id}/comments'.format(  # nopep8
483
            api_base_url=self.api_base_url,
484
            content_id=content_id,
485
            workspace_id=content_info['workspace_id']
486
        )
487
        method = 'POST'
488
        body = {
489
            'raw_content': mail_body
490
        }
491
        return method, endpoint, body
492
493
    def _send_request(
494
            self,
495
            mail: DecodedMail,
496
            imapc: imapclient.IMAPClient,
497
            method: str,
498
            endpoint: str,
499
            json_body_dict: dict
500
    ):
501
        logger.debug(
502
            self,
503
            'Contact API on {endpoint} with method {method} with body {body}'.format(   # nopep8
504
                endpoint=endpoint,
505
                method=method,
506
                body=str(json_body_dict),
507
            ),
508
        )
509
        if method == 'POST':
510
            request_method = requests.post
511
        else:
512
            # TODO - G.M - 2018-08-24 - Better handling exception
513
            raise UnsupportedRequestMethod('Request method not supported')
514
515
        r = request_method(
516
            url=endpoint,
517
            json=json_body_dict,
518
            headers=self._get_auth_headers(mail.get_from_address()),
519
        )
520
        if r.status_code not in [200, 204]:
521
            details = r.json().get('message')
522
            msg = 'bad status code {} (200 and 204 are valid) response when sending mail to tracim: {}'  # nopep8
523
            msg = msg.format(str(r.status_code), details)
524
            raise BadStatusCode(msg)
525
        # Flag all correctly checked mail
526
        if r.status_code in [200, 204]:
527
            imapc.add_flags((mail.uid,), IMAP_CHECKED_FLAG)
528
            imapc.add_flags((mail.uid,), IMAP_SEEN_FLAG)
529