Completed
Push — master ( d9b878...aeb6aa )
by Fox
12s
created

ServiceMastodon.media_in_content()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 20
rs 9.4285
1
# coding: utf-8
2
import arrow
3
from logging import getLogger
4
5
from mastodon import Mastodon as MastodonAPI
6
7
# django classes
8
from django.conf import settings
9
from django.core.cache import caches
10
from django.shortcuts import reverse
11
from django.utils import html
12
from django.utils.translation import ugettext as _
13
14
# django_th classes
15
from django_th.services.services import ServicesMgr
16
from django_th.models import update_result, UserService
17
from django_th.tools import download_image
18
from th_mastodon.models import Mastodon
19
20
import re
21
22
logger = getLogger('django_th.trigger_happy')
23
24
cache = caches['django_th']
25
26
27
class ServiceMastodon(ServicesMgr):
28
    """
29
        Service Mastodon
30
    """
31
    def __init__(self, token=None, **kwargs):
32
        super(ServiceMastodon, self).__init__(token, **kwargs)
33
34
        self.token = token
35
        self.service = 'ServiceMastodon'
36
        self.user = kwargs.get('user')
37
38
    def read_data(self, **kwargs):
39
        """
40
            get the data from the service
41
42
            :param kwargs: contain keyword args : trigger_id at least
43
            :type kwargs: dict
44
            :rtype: list
45
        """
46
        now = arrow.utcnow().to(settings.TIME_ZONE)
47
        my_toots = []
48
        search = {}
49
        since_id = None
50
        trigger_id = kwargs['trigger_id']
51
        date_triggered = arrow.get(kwargs['date_triggered'])
52
53
        def _get_toots(toot_api, toot_obj, search):
54
            """
55
                get the toots from mastodon and return the filters to use
56
57
                :param toot_obj: from Mastodon model
58
                :param search: filter used for MastodonAPI.search()
59
                :type toot_obj: Object ServiceMastodon
60
                :type search: dict
61
                :return: the filter named search, the toots
62
                :rtype: list
63
            """
64
            max_id = 0 if toot_obj.max_id is None else toot_obj.max_id
65
            since_id = 0 if toot_obj.since_id is None else toot_obj.since_id
66
            # get the toots for a given tag
67
            statuses = ''
68
69
            if toot_obj.tag:
70
                search['q'] = toot_obj.tag
71
                # do a search
72
                statuses = toot_api.search(**search)
73
                # just return the content of te statuses array
74
                statuses = statuses['statuses']
75
76
            # get the tweets from a given user
77
            elif toot_obj.tooter:
78
                search['id'] = toot_obj.tooter
79
                # call the user timeline and get his toot
80
                if toot_obj.fav:
81
                    statuses = toot_api.favourites(max_id=max_id,
82
                                                   since_id=since_id)
83
                else:
84
                    user_id = toot_api.account_search(q=toot_obj.tooter)
85
                    statuses = toot_api.account_statuses(
86
                        id=user_id[0]['id'], max_id=toot_obj.max_id,
87
                        since_id=toot_obj.since_id)
88
89
            return search, statuses
90
91
        if self.token is not None:
92
            kw = {'app_label': 'th_mastodon', 'model_name': 'Mastodon',
93
                  'trigger_id': trigger_id}
94
            toot_obj = super(ServiceMastodon, self).read_data(**kw)
95
96
            us = UserService.objects.get(token=self.token,
97
                                         name='ServiceMastodon')
98
            try:
99
                toot_api = MastodonAPI(
100
                    client_id=us.client_id,
101
                    client_secret=us.client_secret,
102
                    access_token=self.token,
103
                    api_base_url=us.host,
104
                )
105
            except ValueError as e:
106
                logger.error(e)
107
                update_result(trigger_id, msg=e, status=False)
108
109
            if toot_obj.since_id is not None and toot_obj.since_id > 0:
110
                since_id = toot_obj.since_id
111
                search = {'since_id': toot_obj.since_id}
112
113
            # first request to Mastodon
114
            search, statuses = _get_toots(toot_api, toot_obj, search)
115
116
            if len(statuses) > 0:
117
                newest = None
118
                for status in statuses:
119
                    if newest is None:
120
                        newest = True
121
                        # first query ; get the max id
122
                        search['max_id'] = max_id = status['id']
123
124
                since_id = search['since_id'] = statuses[-1]['id'] - 1
125
126
                search, statuses = _get_toots(toot_api, toot_obj, search)
127
128
                newest = None
129
                if len(statuses) > 0:
130
                    my_toots = []
131
                    for s in statuses:
132
                        if newest is None:
133
                            newest = True
134
                            max_id = s['id'] - 1
135
                        toot_name = s['account']['username']
136
                        # get the text of the tweet + url to this one
137
138
                        title = _('Toot from <a href="{}">@{}</a>'.
139
                                  format(us.host, toot_name))
140
141
                        my_date = arrow.get(s['created_at']).to(
142
                            settings.TIME_ZONE)
143
                        published = arrow.get(my_date).to(settings.TIME_ZONE)
144
                        if date_triggered is not None and \
145
                           published is not None and \
146
                           now >= published >= date_triggered:
147
                            my_toots.append({'title': title,
148
                                             'content': s['content'],
149
                                             'link': s['url'],
150
                                             'my_date': my_date})
151
                            # digester
152
                            self.send_digest_event(trigger_id, title, s['url'])
153
                    cache.set('th_mastodon_' + str(trigger_id), my_toots)
154
                    Mastodon.objects.filter(trigger_id=trigger_id).update(
155
                        since_id=since_id, max_id=max_id)
156
        return my_toots
157
158
    def save_data(self, trigger_id, **data):
159
        """
160
            get the data from the service
161
162
            :param trigger_id: id of the trigger
163
            :params data, dict
164
            :rtype: dict
165
        """
166
        title, content = super(ServiceMastodon, self).save_data(
167
            trigger_id, **data)
168
169
        # check if we have a 'good' title
170
        if self.title_or_content(title):
171
172
            content = str("{title} {link}").format(title=title,
173
                                                   link=data.get('link'))
174
            content += self.get_tags(trigger_id)
175
        # if not then use the content
176
        else:
177
            content += " " + data.get('link')
178
            content += " " + self.get_tags(trigger_id)
179
180
        content = self.set_mastodon_content(content)
181
182
        us = UserService.objects.get(user=self.user,
183
                                     token=self.token,
184
                                     name='ServiceMastodon')
185
186
        try:
187
            toot_api = MastodonAPI(
188
                    client_id=us.client_id,
189
                    client_secret=us.client_secret,
190
                    access_token=self.token,
191
                    api_base_url=us.host
192
            )
193
        except ValueError as e:
194
            logger.error(e)
195
            status = False
196
            update_result(trigger_id, msg=e, status=status)
197
198
        try:
199
            if settings.DJANGO_TH['sharing_media']:
200
                # do we have a media in the content ?
201
                content, media = self.media_in_content(content)
202
                if media:
203
                    # upload the media first
204
                    media_ids = toot_api.media_post(media_file=media)
205
                    toot_api.status_post(content, media_ids=[media_ids])
206
            else:
207
                toot_api.toot(content)
208
209
            status = True
210
        except Exception as inst:
211
            logger.critical("Mastodon ERR {}".format(inst))
212
            status = False
213
            update_result(trigger_id, msg=inst, status=status)
214
        return status
215
216
    def get_tags(self, trigger_id):
217
        """
218
        get the tags if any
219
        :param trigger_id: the id of the related trigger
220
        :return: tags string
221
        """
222
223
        # get the Mastodon data of this trigger
224
        trigger = Mastodon.objects.get(trigger_id=trigger_id)
225
226
        tags = ''
227
228
        if trigger.tag is not None:
229
            # is there several tag ?
230
            tags = ["#" + tag.strip() for tag in trigger.tag.split(',')
231
                    ] if ',' in trigger.tag else "#" + trigger.tag
232
233
            tags = str(','.join(tags)) if isinstance(tags, list) else tags
234
            tags = ' ' + tags
235
236
        return tags
237
238
    def title_or_content(self, title):
239
        """
240
        If the title always contains 'Tweet from'
241
        drop the title and get 'the content' instead
242
        this allow to have a complet content and not
243
        just a little piece of string
244
        :param title:
245
        :return:
246
        """
247
        return "Tweet from" not in title
248
249
    def media_in_content(self, content):
250
        """
251
        check if the content contains and url of an image
252
        for the moment, check twitter media url
253
        could be elaborate with other service when needed
254
        :param content:
255
        :return:
256
        """
257
        if 'https://t.co' in content:
258
            content = re.sub(r'https://t.co/(\w+)', '', content)
259
        if 'https://pbs.twimg.com/media/' in content:
260
            m = re.search('https://pbs.twimg.com/media/([\w\-_]+).jpg', content)
261
            url = 'https://pbs.twimg.com/media/{}.jpg'.format(m.group(1))
262
            local_file = download_image(url)
263
264
            content = re.sub(r'https://pbs.twimg.com/media/([\w\-_]+).jpg', '',
265
                             content)
266
267
            return content, local_file
268
        return content
269
270
    def set_mastodon_content(self, content):
271
        """
272
        cleaning content by removing any existing html tag
273
        :param content:
274
        :return:
275
        """
276
        content = html.strip_tags(content)
277
278
        if len(content) > 560:
279
            return content[:560]
280
281
        return content
282
283
    def auth(self, request):
284
        """
285
            get the auth of the services
286
            :param request: contains the current session
287
            :type request: dict
288
            :rtype: dict
289
        """
290
        # create app
291
        redirect_uris = '%s://%s%s' % (request.scheme, request.get_host(),
292
                                       reverse('mastodon_callback'))
293
        us = UserService.objects.get(user=request.user,
294
                                     name='ServiceMastodon')
295
        client_id, client_secret = MastodonAPI.create_app(
296
            client_name="TriggerHappy", api_base_url=us.host,
297
            redirect_uris=redirect_uris)
298
299
        us.client_id = client_id
300
        us.client_secret = client_secret
301
        us.save()
302
303
        us = UserService.objects.get(user=request.user,
304
                                     name='ServiceMastodon')
305
        # get the token by logging in
306
        mastodon = MastodonAPI(
307
            client_id=client_id,
308
            client_secret=client_secret,
309
            api_base_url=us.host
310
        )
311
        token = mastodon.log_in(username=us.username, password=us.password)
312
        us.token = token
313
        us.save()
314
        return self.callback_url(request)
315
316
    def callback_url(self, request):
317
        us = UserService.objects.get(user=request.user,
318
                                     name='ServiceMastodon')
319
        mastodon = MastodonAPI(
320
            client_id=us.client_id,
321
            client_secret=us.client_secret,
322
            access_token=us.token,
323
            api_base_url=us.host
324
        )
325
        redirect_uris = '%s://%s%s' % (request.scheme, request.get_host(),
326
                                       reverse('mastodon_callback'))
327
        return mastodon.auth_request_url(redirect_uris=redirect_uris)
328
329
    def callback(self, request, **kwargs):
330
        """
331
            Called from the Service when the user accept to activate it
332
            the url to go back after the external service call
333
            :param request: contains the current session
334
            :param kwargs: keyword args
335
            :type request: dict
336
            :type kwargs: dict
337
            :rtype: string
338
        """
339
        return 'mastodon/callback.html'
340