ServiceTwitter.read_data()   F
last analyzed

Complexity

Conditions 22

Size

Total Lines 153

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 22
c 2
b 0
f 0
dl 0
loc 153
rs 2

1 Method

Rating   Name   Duplication   Size   Complexity  
B ServiceTwitter._get_tweets() 0 62 5

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ServiceTwitter.read_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding: utf-8
2
import arrow
3
4
# Twitter lib
5
from twython import Twython, TwythonAuthError, TwythonRateLimitError
6
7
# django classes
8
from django.conf import settings
9
from django.utils import html
10
from django.utils.translation import ugettext as _
11
from django.core.cache import caches
12
13
# django_th classes
14
from django_th.services.services import ServicesMgr
15
from django_th.models import update_result, UserService
16
from th_twitter.models import Twitter
17
18
from logging import getLogger
19
20
"""
21
    handle process with twitter
22
    put the following in settings.py
23
24
    TH_TWITTER = {
25
        'consumer_key': 'abcdefghijklmnopqrstuvwxyz',
26
        'consumer_secret': 'abcdefghijklmnopqrstuvwxyz',
27
    }
28
29
"""
30
logger = getLogger('django_th.trigger_happy')
31
cache = caches['django_th']
32
33
34
class ServiceTwitter(ServicesMgr):
35
    """
36
        Service Twitter
37
    """
38
    def __init__(self, token=None, **kwargs):
39
        """
40
41
        :param token:
42
        :param kwargs:
43
        """
44
        super(ServiceTwitter, self).__init__(token, **kwargs)
45
        self.consumer_key = settings.TH_TWITTER_KEY['consumer_key']
46
        self.consumer_secret = settings.TH_TWITTER_KEY['consumer_secret']
47
        self.token = token
48
        self.oauth = 'oauth1'
49
        self.service = 'ServiceTwitter'
50
        if self.token is not None:
51
            token_key, token_secret = self.token.split('#TH#')
52
            try:
53
                self.twitter_api = Twython(self.consumer_key,
54
                                           self.consumer_secret,
55
                                           token_key, token_secret)
56
            except (TwythonAuthError, TwythonRateLimitError) as e:
57
                us = UserService.objects.get(token=token)
58
                logger.error(e.msg, e.error_code)
59
                update_result(us.trigger_id, msg=e.msg, status=False)
60
61
    def read_data(self, **kwargs):
62
        """
63
            get the data from the service
64
65
            :param kwargs: contain keyword args : trigger_id at least
66
            :type kwargs: dict
67
            :rtype: list
68
        """
69
        twitter_status_url = 'https://www.twitter.com/{}/status/{}'
70
        twitter_fav_url = 'https://www.twitter.com/{}/status/{}'
71
        now = arrow.utcnow().to(settings.TIME_ZONE)
72
        my_tweets = []
73
        search = {}
74
        since_id = None
75
        trigger_id = kwargs['trigger_id']
76
        date_triggered = arrow.get(kwargs['date_triggered'])
77
78
        def _get_tweets(twitter_obj, search):
79
            """
80
                get the tweets from twitter and return the filters to use :
81
                search and count
82
83
                :param twitter_obj: from Twitter model
84
                :param search: filter used for twython.search() or
85
                twython.get_user_timeline())
86
                :type twitter_obj: Object
87
                :type search: dict
88
                :return: count that limit the quantity of tweet to retrieve,
89
                the filter named search, the tweets
90
                :rtype: list
91
            """
92
93
            """
94
                explanations about statuses :
95
                when we want to track the tweet of a screen
96
                statuses contain all of them
97
                when we want to track all the tweet matching a tag
98
                statuses contain statuses + metadata array
99
                this is why we need to do
100
                statuses = statuses['statuses']
101
                to be able to handle the result as for screen_name
102
            """
103
104
            # get the tweets for a given tag
105
            # https://dev.twitter.com/docs/api/1.1/get/search/tweets
106
            statuses = ''
107
            count = 100
108
            if twitter_obj.tag:
109
                count = 100
110
                search['count'] = count
111
                search['q'] = twitter_obj.tag
112
                search['result_type'] = 'recent'
113
                # do a search
114
                statuses = self.twitter_api.search(**search)
115
                # just return the content of te statuses array
116
                statuses = statuses['statuses']
117
118
            # get the tweets from a given user
119
            # https://dev.twitter.com/docs/api/1.1/get/statuses/user_timeline
120
            elif twitter_obj.screen:
121
                count = 200
122
                search['count'] = count
123
                search['screen_name'] = twitter_obj.screen
124
125
                # call the user timeline and get his tweet
126
                try:
127
                    if twitter_obj.fav:
128
                        count = 20
129
                        search['count'] = 20
130
                        # get the favorites https://dev.twitter.com/rest/
131
                        # reference/get/favorites/list
132
                        statuses = self.twitter_api.get_favorites(**search)
133
                    else:
134
                        statuses = self.twitter_api.get_user_timeline(**search)
135
                except TwythonAuthError as e:
136
                    logger.error(e.msg, e.error_code)
137
                    update_result(trigger_id, msg=e.msg, status=False)
138
139
            return count, search, statuses
140
141
        if self.token is not None:
142
            kw = {'app_label': 'th_twitter',
143
                  'model_name': 'Twitter',
144
                  'trigger_id': trigger_id}
145
            twitter_obj = super(ServiceTwitter, self).read_data(**kw)
146
147
            # https://dev.twitter.com/rest/public/timelines
148
            if twitter_obj.since_id is not None and twitter_obj.since_id > 0:
149
                since_id = twitter_obj.since_id
150
                search = {'since_id': twitter_obj.since_id}
151
152
            # first request to Twitter
153
            count, search, statuses = _get_tweets(twitter_obj, search)
154
155
            if len(statuses) > 0:
156
                newest = None
157
                for status in statuses:
158
                    if newest is None:
159
                        newest = True
160
                        # first query ; get the max id
161
                        search['max_id'] = max_id = status['id']
162
163
                since_id = search['since_id'] = statuses[-1]['id'] - 1
164
165
                count, search, statuses = _get_tweets(twitter_obj, search)
166
167
                newest = None
168
                if len(statuses) > 0:
169
                    my_tweets = []
170
                    for s in statuses:
171
                        if newest is None:
172
                            newest = True
173
                            max_id = s['id'] - 1
174
                        screen_name = s['user']['screen_name']
175
                        # get the text of the tweet + url to this one
176
                        if twitter_obj.fav:
177
                            url = twitter_fav_url.format(screen_name,
178
                                                         s['id_str'])
179
                            title = _('Tweet Fav from @{}'.format(screen_name))
180
                        else:
181
                            url = twitter_status_url.format(screen_name,
182
                                                            s['id_str'])
183
                            title = _('Tweet from @{}'.format(screen_name))
184
                        # Wed Aug 29 17:12:58 +0000 2012
185
                        my_date = arrow.get(s['created_at'],
186
                                            'ddd MMM DD HH:mm:ss Z YYYY')
187
                        published = arrow.get(my_date).to(settings.TIME_ZONE)
188
                        if date_triggered is not None and \
189
                           published is not None and \
190
                           now >= published >= date_triggered:
191
                            if s.get('extended_entities'):
192
                                # get a media
193
                                extended_entities = s['extended_entities']
194
                                if extended_entities.get('media'):
195
                                    medias = extended_entities.get('media')
196
                                    for media in medias:
197
                                        text = s['text'] + ' ' + \
198
                                               media.get('media_url_https')
199
                            else:
200
                                text = s['text']
201
202
                            my_tweets.append({'title': title,
203
                                              'content': text,
204
                                              'link': url,
205
                                              'my_date': my_date})
206
                            # digester
207
                            self.send_digest_event(trigger_id, title, url)
208
                    cache.set('th_twitter_' + str(trigger_id), my_tweets)
209
                    Twitter.objects.filter(trigger_id=trigger_id).update(
210
                        since_id=since_id,
211
                        max_id=max_id,
212
                        count=count)
213
        return my_tweets
214
215
    def save_data(self, trigger_id, **data):
216
        """
217
            let's save the data
218
219
            :param trigger_id: trigger ID from which to save data
220
            :param data: the data to check to be used and save
221
            :type trigger_id: int
222
            :type data:  dict
223
            :return: the status of the save statement
224
            :rtype: boolean
225
        """
226
        status = False
227
        # set the title and content of the data
228
        title, content = super(ServiceTwitter, self).save_data(
229
            trigger_id, **data)
230
231
        if data.get('link') and len(data.get('link')) > 0:
232
            # remove html tag if any
233
            content = html.strip_tags(content)
234
235
            if self.title_or_content(title):
236
237
                content = str("{title} {link}").format(
238
                    title=title, link=data.get('link'))
239
240
                content += self.get_tags(trigger_id)
241
            else:
242
                content = self.set_twitter_content(content)
243
244
            try:
245
                self.twitter_api.update_status(status=content)
246
                status = True
247
            except Exception as inst:
248
                logger.critical("Twitter ERR {}".format(inst))
249
                update_result(trigger_id, msg=inst, status=False)
250
                status = False
251
        return status
252
253
    def get_tags(self, trigger_id):
254
        """
255
        get the tags if any
256
        :param trigger_id: the id of the related trigger
257
        :return: tags string
258
        """
259
260
        # get the Twitter data of this trigger
261
        trigger = Twitter.objects.get(trigger_id=trigger_id)
262
263
        tags = ''
264
265
        if len(trigger.tag) > 0:
266
            # is there several tag ?
267
            tags = ["#" + tag.strip() for tag in trigger.tag.split(',')
268
                    ] if ',' in trigger.tag else "#" + trigger.tag
269
270
            tags = str(','.join(tags)) if isinstance(tags, list) else tags
271
            tags = ' ' + tags
272
273
        return tags
274
275
    def auth(self, request):
276
        """
277
        build the request to access to the Twitter
278
        website with all its required parms
279
        :param request: makes the url to call Twitter + the callback url
280
        :return: go to the Twitter website to ask to the user
281
        to allow the access of TriggerHappy
282
        """
283
        callback_url = self.callback_url(request)
284
285
        twitter = Twython(self.consumer_key, self.consumer_secret)
286
287
        req_token = twitter.get_authentication_tokens(
288
            callback_url=callback_url)
289
        request.session['oauth_token'] = req_token['oauth_token']
290
        request.session['oauth_token_secret'] = req_token['oauth_token_secret']
291
292
        return req_token['auth_url']
293
294
    def callback(self, request, **kwargs):
295
        """
296
            Called from the Service when the user accept to activate it
297
        """
298
        return super(ServiceTwitter, self).callback(request, **kwargs)
299
300
    def get_access_token(
301
        self, oauth_token, oauth_token_secret, oauth_verifier
302
    ):
303
        """
304
        :param oauth_token: oauth_token retrieve by the API Twython
305
        get_authentication_tokens()
306
        :param oauth_token_secret: oauth_token_secret retrieve by the
307
        API Twython get_authentication_tokens()
308
        :param oauth_verifier: oauth_verifier retrieve from Twitter
309
        :type oauth_token: string
310
        :type oauth_token_secret: string
311
        :type oauth_verifier: string
312
        :return: access_token
313
        :rtype: dict
314
        """
315
        twitter = Twython(self.consumer_key,
316
                          self.consumer_secret,
317
                          oauth_token,
318
                          oauth_token_secret)
319
        access_token = twitter.get_authorized_tokens(oauth_verifier)
320
        return access_token
321
322
    def title_or_content(self, title):
323
        """
324
        If the title always contains 'New status from'
325
        drop the title and get 'the content' instead
326
        :param title:
327
        :return:
328
        """
329
        return "Toot from" not in title
330
331
    def set_twitter_content(self, content):
332
        """
333
        cleaning content by removing any existing html tag
334
        :param content:
335
        :return:
336
        """
337
        content = html.strip_tags(content)
338
339
        if len(content) > 140:
340
            return content[:140]
341
342
        return content
343