tweet_those()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 8
Bugs 1 Features 0
Metric Value
cc 15
c 8
b 1
f 0
dl 0
loc 59
rs 3.1379

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tweet_those() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
"""
3
Wordnik and Twitter utility functions
4
"""
5
6
# ================ GENERAL ==================
7
8
import argparse
9
try:
10
    import ConfigParser as configparser
11
except ImportError:
12
    import configparser
13
import csv
14
import datetime
15
import os
16
import random
17
import re
18
import time
19
20
from wordnik import swagger, AccountApi, WordListApi
21
from twitter import Twitter, OAuth  # pip install twitter
22
23
# For Python 2.x
24
try:
25
    input = raw_input
26
except NameError:
27
    pass
28
29
# Log time
30
print(time.ctime())
31
32
33
# Test mode doesn't actually save csv, ini or update Wordnik or Twitter
34
TEST_MODE = False
35
36
TWEET_CHOICES = (
37
    'none',  # 'none' must be first
38
    'latest', 'latest_onetweet', '24hours', '7days', '30days', 'thisyear',
39
    'alltime',
40
    'retweet', 'random')  # 'retweet' and 'random' must be last
41
42
DAY_IN_SECONDS = 24 * 60 * 60
43
44
45
# Remove duplicates from a list but keep in order
46
# http://stackoverflow.com/questions/480214/
47
def dedupe(seq):
48
    seen = set()
49
    seen_add = seen.add
50
    return [x for x in seq if x not in seen and not seen_add(x)]
51
52
53
# cmd.exe cannot do Unicode so encode first
54
def print_it(text):
55
    print(text.encode('utf-8'))
56
57
58
def do_argparse(description=None):
59
    parser = argparse.ArgumentParser(
60
        description=description,
61
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
62
    parser.add_argument(
63
        '-t', '--tweet', default='latest', choices=TWEET_CHOICES,
64
        help="How to tweet the results.")
65
    return parser
66
67
# The `stuff` list looks like:
68
#     [
69
#     ["I love the word", "I hate the word"], # search term
70
#     [love_max_id, hate_max_id],
71
#     ["twitter-loves", "twitter-hates"] # Wordnik word list permalink
72
#     ]
73
74
75
def load_ini(ini_file, stuff):
76
    config = configparser.ConfigParser()
77
    result = config.read(ini_file)
78
    if result:
79
        # print(summary)
80
        for i in range(len(stuff[1])):
81
            # Load max IDs using permalink as key
82
            stuff[1][i] = config.get("max_ids", stuff[2][i])
83
    return stuff
84
85
    print("Loaded: " + stuff[1])
86
87
88
def save_ini(ini_file, stuff):
89
    print("Save: " + str(stuff[1]))
90
91
    config = configparser.ConfigParser()
92
    config.add_section("max_ids")
93
    for i in range(len(stuff[1])):
94
        # Save max IDs using permalink as key
95
        config.set("max_ids", stuff[2][i], stuff[1][i])
96
97
    if TEST_MODE:
98
        return
99
    with open(ini_file, 'wb') as configfile:
100
        config.write(configfile)
101
102
103
def update_csv(csv_file, search_term, words, statuses):
104
    file_exists = os.path.exists(csv_file)
105
    if TEST_MODE:
106
        return
107
    fd = open(csv_file, 'ab')
108
    try:
109
        writer = csv.writer(fd)
110
        if not file_exists:  # add header
111
            writer.writerow((
112
                'word', 'search_term', 'created_at', 'id_str',
113
                'screen_name', 'user_name', 'text'))
114
        for i, status in enumerate(statuses):
115
            csv_data = [
116
                words[i], search_term, status['created_at'], status['id_str'],
117
                status['user']['screen_name'], status['user']['name'],
118
                status['text'].replace('\n', ' ')]
119
            for i, field in enumerate(csv_data):
120
                csv_data[i] = field.encode('utf8')
121
            writer.writerow(csv_data)
122
123
    finally:
124
        fd.close()
125
126
127
def get_pattern(search_term, target_word_follows_search_term):
128
    # word boundary, one or more word chars, any '-*,
129
    # one or more word chars, word boundary
130
    word_pattern = "[^\w]*(\w+(['-\*]*\w)*)[^\w]*"
131
132
    if target_word_follows_search_term:
133
        # Matches search term ("I love the word")
134
        # followed by whitespace then at least one
135
        # [whitespace, period, exclamation mark, comma,
136
        # brackets, question mark]
137
        # pattern = re.compile(
138
            # search_term + "\s+([^\s.!,()?]+)", re.IGNORECASE)
139
140
        # \s = whitespace
141
        # \w = word characters (a-zA-Z0-9_) but re.UNICODE allows umlauts
142
        # and things search term, whitespace, then any number of non-word
143
        # chars, then begin group: one or more word chars, then any number
144
        # of apostrophes and hyphens as long as they are followed by a word
145
        # char. Then end the group with any number of non-word chars.
146
        pattern = re.compile(
147
            search_term + "\s+" + word_pattern,
148
            re.IGNORECASE | re.UNICODE)
149
    else:
150
        # Matches at least something that's NOT
151
        # [whitespace, period, exclamation mark, comma,
152
        # open bracket, close bracket],
153
        # followed by at least one
154
        # [whitespace, period, exclamation mark, comma]
155
        # and then "is my new etc."
156
        pattern = re.compile(
157
            word_pattern + "\s+" + search_term,
158
            re.IGNORECASE | re.UNICODE)
159
160
    return pattern
161
162
163
def word_from_text(text, pattern, search_term):
164
    """ If matching word found in tweet text, return it. Else return None """
165
    print_it(text)
166
167
    # Ignore retweets
168
    if text.startswith('RT'):
169
        return None
170
171
    # Ignore retweets
172
    if ' RT ' in text and text.find(' RT ') < text.find(search_term):
173
        return None
174
175
    # Ignore tweets beginning with a curly left double quote,
176
    # they're often quoting another person's tweet
177
    if text[0] == u"\u201c":
178
        return None
179
180
    # Ignore, probably quoting another's tweet
181
    # (but don't ignore: '"word" is my new favourite')
182
    if text.startswith('"@'):
183
        return None
184
185
    match = re.search(pattern, text)
186
    if match:
187
        word = match.group(1).lower()
188
189
        if len(word) == 0:
190
            return None
191
192
        # Ignore some common words
193
        if word.lower() in [
194
                "it", "this", "that", "which", "and",
195
                "a", "of", "in", "but", "there"]:
196
            return None
197
198
        # Ignore if any unbalanced brackets
199
        open = 0
200
        for char in word:
201
            if char == "(":
202
                open += 1
203
            elif char == ")":
204
                open -= 1
205
            if open < 0:
206
                return None
207
        if open != 0:
208
            return None
209
210
        # OK, got something
211
        print_it(">" + word + "<")
212
        return word
213
214
    # Nothing found
215
    return None
216
217
218
def extract_words(search_term, target_word_follows_search_term, results):
219
    words = []
220
    statuses = []
221
    pattern = get_pattern(search_term, target_word_follows_search_term)
222
223
    for status in results['statuses']:
224
        # Ignore a Twitter bot
225
        print(status['user']['screen_name'])
226
        print(status['user']['screen_name'] == "unrepedant")
227
        print(type(status['user']['screen_name']), type("unrepedant"))
228
        if status['user']['screen_name'] == "unrepedant":
229
            continue
230
231
        text = status['text']
232
        print("----")
233
234
        word = word_from_text(text, pattern, search_term)
235
        if word is not None:
236
            # print_it(status['user']['screen_name'])
237
            words.append(word)
238
            statuses.append(status)
239
240
    return words, statuses
241
242
243
def find_words(
244
        search_term, target_word_follows_search_term, results, csv_file):
245
    words, statuses = extract_words(
246
        search_term, target_word_follows_search_term, results)
247
    update_csv(csv_file, search_term, words, statuses)
248
    return words
249
250
251
def find_colnum(heading, row):
252
    """Find the coloumn number for a given heading"""
253
    # Find word column
254
    found_colnum = None
255
    for colnum, col in enumerate(row):
256
        if heading == col:
257
            found_colnum = colnum
258
            break
259
    return found_colnum
260
261
262
def words_and_ids_from_csv(csv_file, search_term, seconds_delta=None):
263
    """Load the CSV and return a random ID from the given time period"""
264
    cutoff = 0
265
    if seconds_delta:
266
        epoch_time = int(time.time())
267
        cutoff = epoch_time - seconds_delta
268
269
    word_colnum, searchterm_colnum, created_at_colnum = None, None, None
270
    matched_words, eligible_ids = [], []
271
    seen = set()  # avoid duplicates
272
    ifile = open(csv_file, "r")
273
    reader = csv.reader(ifile)
274
275
    for rownum, row in enumerate(reader):
276
        # Save header row
277
        if rownum == 0:
278
            # Find columns
279
            word_colnum = find_colnum("word", row)
280
            searchterm_colnum = find_colnum("search_term", row)
281
            created_at_colnum = find_colnum("created_at", row)
282
            text_colnum = find_colnum("text", row)
283
            id_str_colnum = find_colnum("id_str", row)
284
285
        else:  # not header
286
            if not row:
287
                continue
288
            # Avoid duplicates
289
            if row[id_str_colnum] in seen:
290
                continue
291
            seen.add(row[id_str_colnum])
292
293
            # Kill the spambot!
294
            if row[searchterm_colnum] != search_term:
295
                continue
296
            text = row[text_colnum]
297
            if text[0] == "@" and \
298
                    "I love the word douchebag. http://t.co/" in text:
299
                # print(row[text_colnum])
300
                continue
301
302
            # seconds since epoch:
303
            timestamp = time.mktime(time.strptime(
304
                row[created_at_colnum], '%a %b %d %H:%M:%S +0000 %Y'))
305
            if timestamp > cutoff:
306
                eligible_ids.append(row[id_str_colnum])
307
                matched_words.append(row[word_colnum].decode('utf-8'))
308
309
    ifile.close()
310
311
    return eligible_ids, matched_words
312
313
314
def pick_a_random_tweet(csv_file, search_term, seconds_delta=None):
315
    """Load the CSV and return a random ID from the given time period"""
316
317
    eligible_ids, matched_words = words_and_ids_from_csv(csv_file, search_term,
318
                                                         seconds_delta)
319
320
    # Return a random ID
321
    return random.choice(eligible_ids)
322
323
324
def load_words_from_csv(csv_file, search_term, seconds_delta=None):
325
    """Load the CSV and return the top words for a given time period"""
326
327
    eligible_ids, matched_words = words_and_ids_from_csv(csv_file, search_term,
328
                                                         seconds_delta)
329
330
    import most_frequent_words
331
    # Max tweet length is 140
332
    # Let's naively set an upper limit of 140/3:
333
    # one-character word, comma and space
334
    top_words = most_frequent_words.most_frequent_words(matched_words, 140/3)
335
    return top_words
336
337
338
# ================= WORDNIK ==================
339
340
# Wordnik: get API key at http://developer.wordnik.com/
341
WORDNIK_API_KEY = "3fd3445662c1ac873962d06094f057f39d4711730e1adc28f"
342
WORDNIK_USERNAME = "hugovk"
343
WORDNIK_PASSWORD = "mytopsecretwordnikpassword"
344
WORDNIK_TOKEN = None
345
346
wordnik_client = swagger.ApiClient(
347
    WORDNIK_API_KEY, 'http://api.wordnik.com/v4')
348
wordListApi = WordListApi.WordListApi(wordnik_client)
349
350
351
# TODO: Save token to ini file
352
def get_wordnik_token():
353
    import getpass
354
    if WORDNIK_USERNAME:
355
        my_username = WORDNIK_USERNAME
356
    else:
357
        my_username = input("Enter your Wordnik username: ")
358
    if WORDNIK_PASSWORD:
359
        my_password = WORDNIK_PASSWORD
360
    else:
361
        my_password = getpass.getpass("Enter your Wordnik password: ")
362
363
    account_api = AccountApi.AccountApi(wordnik_client)
364
    result = account_api.authenticate(my_username, my_password)
365
    token = result.token
366
    print("Your Wordnik token is: " + token)
367
    return token
368
369
370
def add_to_wordnik(words, wordlist_permalink):
371
    if len(words) == 0:
372
        return
373
374
    if TEST_MODE:
375
        return
376
377
    global WORDNIK_TOKEN
378
    if WORDNIK_TOKEN is None:
379
        # Only need to do this once
380
        WORDNIK_TOKEN = get_wordnik_token()
381
382
    words.sort()
383
    print_it("Words: " + ', '.join(words))
384
    if len(words) == 1:
385
        number = "1 word"
386
    else:
387
        number = str(len(words)) + " words"
388
    print("Adding " + number + " to Wordnik list:" + wordlist_permalink)
389
390
    from wordnik.models import StringValue
391
    words_to_add = []
392
    for word in words:
393
        word_to_add = StringValue.StringValue()
394
        word_to_add.word = word
395
        words_to_add.append(word_to_add)
396
397
    print_it(wordlist_permalink + " " + WORDNIK_TOKEN + " " + " ".join(words))
398
399
    wordListApi.addWordsToWordList(
400
        wordlist_permalink, WORDNIK_TOKEN, body=words_to_add)
401
402
    print(number + " added")
403
404
405
# ================ TWITTER ==================
406
407
t = None
408
409
410
def init_twitter(oauth_token, oauth_secret, consumer_key, consumer_secret):
411
    global t
412
    t = Twitter(auth=OAuth(oauth_token, oauth_secret,
413
                           consumer_key, consumer_secret))
414
415
416
def get_words_from_twitter(search_term, since_id=0):
417
    results = t.search.tweets(
418
        q='"' + search_term + '"', count=100, since_id=int(since_id))
419
420
    print(results['search_metadata'])
421
    print("Requested:\t" + str(results['search_metadata']['count']))
422
    print("Found:\t" + str(len(results['statuses'])))
423
    max_id = results['search_metadata']['max_id']
424
    print("Max ID:\t" + str(max_id))
425
426
    return max_id, results
427
428
429
def retweet(id, trim_user=True):
430
    print_it("RETWEET THIS: " + str(id))
431
432
    if not TEST_MODE:
433
        try:
434
            t.statuses.retweet(id=id, trim_user=trim_user)
435
        except Exception as e:
436
            print(str(e))
437
            # TODO If the account is now protected, we get an error like...
438
            # Twitter sent status 403 for URL: 1.1/statuses/retweet/
439
            # 012345678901234567.json using parameters: ...
440
            # details: {"errors":"sharing is not permissible for this status
441
            # (Share validations failed)"}
442
            # ... so could try another.
443
444
445
def tweet_string(string):
446
    if len(string) <= 0:
447
        return
448
    if len(string) + 1 <= 140:  # Finish properly, if there's room
449
        string += "."
450
451
    print_it("TWEET THIS: " + string)
452
453
    if not TEST_MODE:
454
        try:
455
            t.statuses.update(status=string)
456
        except Exception as e:
457
            print(str(e))
458
459
460
def update_tweet_with_words(tweet, words):
461
    """
462
    IN: tweet with a prefix, list of words
463
    OUT: updated tweet, list of words_remaining
464
    """
465
    new_tweet = tweet
466
    words_remaining = list(words)
467
    for i, word in enumerate(words):
468
        if i == 0:
469
            new_tweet = tweet + word
470
        else:
471
            # new_tweet = tweet + ", " + word
472
            new_tweet = tweet + " " + word
473
        if len(new_tweet) > 140:
474
            break
475
        else:
476
            tweet = new_tweet
477
        words_remaining.pop(0)
478
    return tweet, words_remaining
479
480
481
def tweet_those(
482
        words, tweet_prefix, csv_file=None, search_term=None, mode="latest"):
483
    # Remove duplicates
484
    words = dedupe(words)
485
486
    shuffle, tweet_all_words = False, False
487
    extra_prefix = ""
488
489
    if mode == "retweet":
490
        id = pick_a_random_tweet(csv_file, search_term, 2 * DAY_IN_SECONDS)
491
        retweet(id)
492
        return
493
    elif mode == "none":
494
        return
495
    elif mode == "latest":
496
        tweet_all_words = True
497
    elif mode == "latest_onetweet":
498
        shuffle = True
499
    elif mode == "24hours":
500
        words = load_words_from_csv(csv_file, search_term, DAY_IN_SECONDS)
501
        extra_prefix += " (24 hours)"
502
    elif mode == "7days":
503
        words = load_words_from_csv(csv_file, search_term, 7 * DAY_IN_SECONDS)
504
        extra_prefix += " (7 days)"
505
    elif mode == "30days":
506
        words = load_words_from_csv(csv_file, search_term, 30 * DAY_IN_SECONDS)
507
        extra_prefix += " (30 days)"
508
    elif mode == "thisyear":
509
        # How many seconds since 1 Jan this year?
510
        now = datetime.datetime.now()
511
        year_start = datetime.datetime(now.year, month=1, day=1)
512
        seconds_delta = (now - year_start).total_seconds()
513
        words = load_words_from_csv(csv_file, search_term, seconds_delta)
514
        extra_prefix += " (" + str(now.year) + ")"
515
    elif mode == "alltime":
516
        words = load_words_from_csv(csv_file, search_term, None)
517
        extra_prefix += " (all time)"
518
    else:
519
        print("Unknown mode: " + mode)
520
        return
521
522
    if len(words) < 1:  # validation
523
        return
524
525
    if shuffle:
526
        random.shuffle(words)
527
528
    tweet = tweet_prefix
529
    if len(words) == 1:  # get the plural right
530
        tweet += extra_prefix + ": "
531
    else:
532
        tweet += "s" + extra_prefix + ": "
533
534
    tweet, words_remaining = update_tweet_with_words(tweet, words)
535
536
    tweet_string(tweet)
537
538
    if tweet_all_words and len(words_remaining) > 0:
539
        tweet_those(words_remaining, tweet_prefix)
540
541
# End of file
542