GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 831043...62ff6e )
by Bastien
16s
created

DecodedMail.get_body()   A

Complexity

Conditions 4

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 17
rs 9.2
c 3
b 0
f 0
1
# -*- coding: utf-8 -*-
2
3
import sys
4
import time
5
import imaplib
6
import datetime
7
import json
8
import typing
9
from email.message import Message
10
from email.header import Header, decode_header, make_header
11
from email.utils import parseaddr, parsedate_tz, mktime_tz
12
from email import message_from_bytes
13
14
import markdown
15
import requests
16
from bs4 import BeautifulSoup, Tag
17
from email_reply_parser import EmailReplyParser
18
19
from tracim.lib.base import logger
20
21
TRACIM_SPECIAL_KEY_HEADER = 'X-Tracim-Key'
22
# TODO BS 20171124: Think about replace thin dict config by object
23
BEAUTIFULSOUP_HTML_BODY_PARSE_CONFIG = {
24
    'tag_blacklist': ['script', 'style', 'blockquote'],
25
    'class_blacklist': ['moz-cite-prefix', 'gmail_extra', 'gmail_quote',
26
                        'yahoo_quoted'],
27
    'id_blacklist': ['reply-intro'],
28
    'tag_whitelist': ['a', 'b', 'strong', 'i', 'br', 'ul', 'li', 'ol',
29
                      'em', 'i', 'u',
30
                      'thead', 'tr', 'td', 'tbody', 'table', 'p', 'pre'],
31
    'attrs_whitelist': ['href'],
32
}
33
CONTENT_TYPE_TEXT_PLAIN = 'text/plain'
34
CONTENT_TYPE_TEXT_HTML = 'text/html'
35
36
37
class DecodedMail(object):
38
    def __init__(self, message: Message) -> None:
39
        self._message = message
40
41
    def _decode_header(self, header_title: str) -> typing.Optional[str]:
42
        # FIXME : Handle exception
43
        if header_title in self._message:
44
            return str(make_header(decode_header(self._message[header_title])))
45
        else:
46
            return None
47
48
    def get_subject(self) -> typing.Optional[str]:
49
        return self._decode_header('subject')
50
51
    def get_from_address(self) -> str:
52
        return parseaddr(self._message['From'])[1]
53
54
    def get_to_address(self) -> str:
55
        return parseaddr(self._message['To'])[1]
56
57
    def get_first_ref(self) -> str:
58
        return parseaddr(self._message['References'])[1]
59
60
    def get_special_key(self) -> typing.Optional[str]:
61
        return self._decode_header(TRACIM_SPECIAL_KEY_HEADER)
62
63
    def get_body(self) -> typing.Optional[str]:
64
        body_part = self._get_mime_body_message()
65
        body = None
66
        if body_part:
67
            charset = body_part.get_content_charset('iso-8859-1')
68
            content_type = body_part.get_content_type()
69
            if content_type == CONTENT_TYPE_TEXT_PLAIN:
70
                txt_body = body_part.get_payload(decode=True).decode(
71
                    charset)
72
                body = DecodedMail._parse_txt_body(txt_body)
73
74
            elif content_type == CONTENT_TYPE_TEXT_HTML:
75
                html_body = body_part.get_payload(decode=True).decode(
76
                    charset)
77
                body = DecodedMail._parse_html_body(html_body)
78
79
        return body
80
81
    @classmethod
82
    def _parse_txt_body(cls, txt_body: str) -> str:
83
        txt_body = EmailReplyParser.parse_reply(txt_body)
84
        html_body = markdown.markdown(txt_body)
85
        body = DecodedMail._parse_html_body(html_body)
86
        return body
87
88
    @classmethod
89
    def _parse_html_body(cls, html_body: str) -> str:
90
        soup = BeautifulSoup(html_body, 'html.parser')
91
        config = BEAUTIFULSOUP_HTML_BODY_PARSE_CONFIG
92
        for tag in soup.findAll():
93
            if DecodedMail._tag_to_extract(tag):
94
                tag.extract()
95
            elif tag.name.lower() in config['tag_whitelist']:
96
                attrs = dict(tag.attrs)
97
                for attr in attrs:
98
                    if attr not in config['attrs_whitelist']:
99
                        del tag.attrs[attr]
100
            else:
101
                tag.unwrap()
102
        return str(soup)
103
104
    @classmethod
105
    def _tag_to_extract(cls, tag: Tag) -> bool:
106
        config = BEAUTIFULSOUP_HTML_BODY_PARSE_CONFIG
107
        if tag.name.lower() in config['tag_blacklist']:
108
            return True
109
        if 'class' in tag.attrs:
110
            for elem in config['class_blacklist']:
111
                if elem in tag.attrs['class']:
112
                    return True
113
        if 'id' in tag.attrs:
114
            for elem in config['id_blacklist']:
115
                if elem in tag.attrs['id']:
116
                    return True
117
        return False
118
119
    def _get_mime_body_message(self) -> typing.Optional[Message]:
120
        # TODO - G.M - 2017-11-16 - Use stdlib msg.get_body feature for py3.6+
121
        part = None
122
        # Check for html
123
        for part in self._message.walk():
124
            content_type = part.get_content_type()
125
            content_dispo = str(part.get('Content-Disposition'))
126
            if content_type == CONTENT_TYPE_TEXT_HTML \
127
                    and 'attachment' not in content_dispo:
128
                return part
129
        # check for plain text
130
        for part in self._message.walk():
131
            content_type = part.get_content_type()
132
            content_dispo = str(part.get('Content-Disposition'))
133
            if content_type == CONTENT_TYPE_TEXT_PLAIN \
134
                    and 'attachment' not in content_dispo:
135
                return part
136
        return part
137
138
    def get_key(self) -> typing.Optional[str]:
139
140
        """
141
        key is the string contain in some mail header we need to retrieve.
142
        First try checking special header, them check 'to' header
143
        and finally check first(oldest) mail-id of 'references' header
144
        """
145
        first_ref = self.get_first_ref()
146
        to_address = self.get_to_address()
147
        special_key = self.get_special_key()
148
149
        if special_key:
150
            return special_key
151
        if to_address:
152
            return DecodedMail.find_key_from_mail_address(to_address)
153
        if first_ref:
154
            return DecodedMail.find_key_from_mail_address(first_ref)
155
156
        return None
157
158
    @classmethod
159
    def find_key_from_mail_address(
160
        cls,
161
        mail_address: str,
162
    ) -> typing.Optional[str]:
163
        """ Parse mail_adress-like string
164
        to retrieve key.
165
166
        :param mail_address: user+key@something like string
167
        :return: key
168
        """
169
        username = mail_address.split('@')[0]
170
        username_data = username.split('+')
171
        if len(username_data) == 2:
172
            return username_data[1]
173
        return None
174
175
176
class MailFetcher(object):
177
    def __init__(
178
        self,
179
        host: str,
180
        port: str,
181
        user: str,
182
        password: str,
183
        use_ssl: bool,
184
        folder: str,
185
        delay: int,
186
        endpoint: str,
187
        token: str,
188
    ) -> None:
189
        """
190
        Fetch mail from a mailbox folder through IMAP and add their content to
191
        Tracim through http according to mail Headers.
192
        Fetch is regular.
193
        :param host: imap server hostname
194
        :param port: imap connection port
195
        :param user: user login of mailbox
196
        :param password: user password of mailbox
197
        :param use_ssl: use imap over ssl connection
198
        :param folder: mail folder where new mail are fetched
199
        :param delay: seconds to wait before fetching new mail again
200
        :param endpoint: tracim http endpoint where decoded mail are send.
201
        :param token: token to authenticate http connexion
202
        """
203
        self._connection = None
204
        self.host = host
205
        self.port = port
206
        self.user = user
207
        self.password = password
208
        self.use_ssl = use_ssl
209
        self.folder = folder
210
        self.delay = delay
211
        self.endpoint = endpoint
212
        self.token = token
213
214
        self._is_active = True
215
216
    def run(self) -> None:
217
        while self._is_active:
218
            time.sleep(self.delay)
219
            try:
220
                self._connect()
221
                messages = self._fetch()
222
                # TODO - G.M -  2017-11-22 retry sending unsended mail
223
                # These mails are return by _notify_tracim, flag them with "unseen"
224
                # or store them until new _notify_tracim call
225
                cleaned_mails = [DecodedMail(msg) for msg in messages]
226
                self._notify_tracim(cleaned_mails)
227
                self._disconnect()
228
            except Exception as e:
229
                # TODO - G.M - 2017-11-23 - Identify possible exceptions
230
                log = 'IMAP error: {}'
231
                logger.warning(self, log.format(e.__str__()))
232
233
    def stop(self) -> None:
234
        self._is_active = False
235
236
    def _connect(self) -> None:
237
        # TODO - G.M - 2017-11-15 Verify connection/disconnection
238
        # Are old connexion properly close this way ?
239
        if self._connection:
240
            self._disconnect()
241
        # TODO - G.M - 2017-11-23 Support for predefined SSLContext ?
242
        # without ssl_context param, tracim use default security configuration
243
        # which is great in most case.
244
        if self.use_ssl:
245
            self._connection = imaplib.IMAP4_SSL(self.host, self.port)
246
        else:
247
            self._connection = imaplib.IMAP4(self.host, self.port)
248
249
        try:
250
            self._connection.login(self.user, self.password)
251
        except Exception as e:
252
            log = 'IMAP login error: {}'
253
            logger.warning(self, log.format(e.__str__()))
254
255
    def _disconnect(self) -> None:
256
        if self._connection:
257
            self._connection.close()
258
            self._connection.logout()
259
            self._connection = None
260
261
    def _fetch(self) -> typing.List[Message]:
262
        """
263
        Get news message from mailbox
264
        :return: list of new mails
265
        """
266
        messages = []
267
        # select mailbox
268
        rv, data = self._connection.select(self.folder)
269
        if rv == 'OK':
270
            # get mails
271
            # TODO - G.M -  2017-11-15 Which files to select as new file ?
272
            # Unseen file or All file from a directory (old one should be
273
            #  moved/ deleted from mailbox during this process) ?
274
            rv, data = self._connection.search(None, "(UNSEEN)")
275
            if rv == 'OK':
276
                # get mail content
277
                for num in data[0].split():
278
                    # INFO - G.M - 2017-11-23 - Fetch (RFC288) to retrieve all
279
                    # complete mails see example : https://docs.python.org/fr/3.5/library/imaplib.html#imap4-example .  # nopep8
280
                    # Be careful, This method remove also mails from Unseen
281
                    # mails
282
                    rv, data = self._connection.fetch(num, '(RFC822)')
283
                    if rv == 'OK':
284
                        msg = message_from_bytes(data[0][1])
285
                        messages.append(msg)
286
                    else:
287
                        log = 'IMAP : Unable to get mail : {}'
288
                        logger.debug(self, log.format(str(rv)))
289
            else:
290
                # FIXME : Distinct error from empty mailbox ?
291
                pass
292
        else:
293
            log = 'IMAP : Unable to open mailbox : {}'
294
            logger.debug(self, log.format(str(rv)))
295
        return messages
296
297
    def _notify_tracim(
298
        self,
299
        mails: typing.List[DecodedMail],
300
    ) -> typing.List[DecodedMail]:
301
        """
302
        Send http request to tracim endpoint
303
        :param mails: list of mails to send
304
        :return: unsended mails
305
        """
306
        unsended_mails = []
307
        # TODO BS 20171124: Look around mail.get_from_address(), mail.get_key()
308
        # , mail.get_body() etc ... for raise InvalidEmailError if missing
309
        #  required informations (actually get_from_address raise IndexError
310
        #  if no from address for example) and catch it here
311
        while mails:
312
            mail = mails.pop()
313
            msg = {'token': self.token,
314
                   'user_mail': mail.get_from_address(),
315
                   'content_id': mail.get_key(),
316
                   'payload': {
317
                       'content': mail.get_body(),
318
                   }}
319
            try:
320
                r = requests.post(self.endpoint, json=msg)
321
                if r.status_code not in [200, 204]:
322
                    log = 'bad status code response when sending mail to tracim: {}'  # nopep8
323
                    logger.error(self, log.format(str(r.status_code)))
324
            # TODO - G.M - Verify exception correctly works
325
            except requests.exceptions.Timeout as e:
326
                log = 'Timeout error to transmit fetched mail to tracim : {}'
327
                logger.error(self, log.format(str(e)))
328
                unsended_mails.append(mail)
329
                break
330
            except requests.exceptions.RequestException as e:
331
                log = 'Fail to transmit fetched mail to tracim : {}'
332
                logger.error(self, log.format(str(e)))
333
                break
334
335
        return unsended_mails
336