1
|
|
|
import urllib |
2
|
|
|
|
3
|
|
|
from requests import Session, Response |
4
|
|
|
from yurl import URL |
5
|
|
|
|
6
|
|
|
from tumdlr import __version__ |
7
|
|
|
from tumdlr.containers import TumblrPost, TumblrPhotoSet, TumblrVideoPost |
8
|
|
|
from tumdlr.errors import TumdlrParserError |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class TumblrBlog: |
12
|
|
|
|
13
|
|
|
def __init__(self, url, session=None, **kwargs): |
14
|
|
|
""" |
15
|
|
|
Tumblr blog |
16
|
|
|
|
17
|
|
|
Args: |
18
|
|
|
url(URL|str): Tumblr profile URL |
19
|
|
|
session(Optional[Session]): An optional custom Requests session |
20
|
|
|
|
21
|
|
|
Keyword Args: |
22
|
|
|
api_key(str): Tumblr API key |
23
|
|
|
""" |
24
|
|
|
self._url = url if isinstance(url, URL) else URL(url) |
25
|
|
|
self._api_url = URL(scheme='https', host='api.tumblr.com', path='/v2/') |
26
|
|
|
self._api_response = None # type: Response |
27
|
|
|
self._api_key = kwargs.get('api_key', 'fuiKNFp9vQFvjLNvx4sUwti4Yb5yGutBN4Xh10LXZhhRKjWlV4') |
28
|
|
|
self._uagent = kwargs.get('user_agent', 'tumdlr/{version}') |
29
|
|
|
|
30
|
|
|
if not session: |
31
|
|
|
session = Session() |
32
|
|
|
session.headers.update({ |
33
|
|
|
'Referer': urllib.parse.quote(self._url.as_string()), |
34
|
|
|
'User-Agent': self._uagent.format(version=__version__) |
35
|
|
|
}) |
36
|
|
|
|
37
|
|
|
self.session = session |
38
|
|
|
|
39
|
|
|
self.title = None # type: str |
40
|
|
|
self.url = None # type: URL |
41
|
|
|
self.name = None # type: str |
42
|
|
|
self.description = None # type: str |
43
|
|
|
self.is_nsfw = None # type: bool |
44
|
|
|
self.likes = None # type: int|False |
45
|
|
|
self.post_count = None # type: int |
46
|
|
|
self.updated = None # type: int |
47
|
|
|
|
48
|
|
|
self._posts = [] |
49
|
|
|
self.offset = 0 |
50
|
|
|
|
51
|
|
|
self._api_url = self._api_url.replace( |
52
|
|
|
path=self._api_url.path + 'blog/{host}/posts'.format(host=self._url.host) |
53
|
|
|
) |
54
|
|
|
self._api_get() |
55
|
|
|
|
56
|
|
|
def _api_get(self, query=None, parse=True): |
57
|
|
|
""" |
58
|
|
|
Execute an API query |
59
|
|
|
|
60
|
|
|
Args: |
61
|
|
|
query(Optional[dict]): Extra query parameters |
62
|
|
|
parse(Optional[bool]): Parse the API response immediately |
63
|
|
|
""" |
64
|
|
|
# Parse extra query parameters |
65
|
|
|
query_extra = [] |
66
|
|
|
|
67
|
|
|
if query: |
68
|
|
|
for key, value in query.items(): |
69
|
|
|
query_extra.append( |
70
|
|
|
'{key}={value}'.format( |
71
|
|
|
key=urllib.parse.quote(key), |
72
|
|
|
value=urllib.parse.quote(value) |
73
|
|
|
) |
74
|
|
|
) |
75
|
|
|
|
76
|
|
|
# Only prepend an ampersand if we have extra attributes, otherwise default to an empty string |
77
|
|
|
if query_extra: |
78
|
|
|
query_extra = '&' + '&'.join(query_extra) |
79
|
|
|
else: |
80
|
|
|
query_extra = '' |
81
|
|
|
|
82
|
|
|
endpoint = self._api_url.replace( |
83
|
|
|
query='api_key={api_key}&filter=text&offset={offset}{extra}'.format( |
84
|
|
|
api_key=self._api_key, offset=self.offset, extra=query_extra |
85
|
|
|
) |
86
|
|
|
) |
87
|
|
|
|
88
|
|
|
response = self.session.get(endpoint.as_string()) # type: Response |
89
|
|
|
response.raise_for_status() |
90
|
|
|
|
91
|
|
|
self._api_response = response |
92
|
|
|
if parse: |
93
|
|
|
self._api_parse_response() |
94
|
|
|
|
95
|
|
|
def _api_parse_response(self): |
96
|
|
|
""" |
97
|
|
|
Parse an API response |
98
|
|
|
|
99
|
|
|
""" |
100
|
|
|
blog = self._api_response.json()['response']['blog'] |
101
|
|
|
|
102
|
|
|
self.title = blog['title'] |
103
|
|
|
self.url = URL(blog['url']) |
104
|
|
|
self.name = blog['name'] |
105
|
|
|
self.description = blog['description'] |
106
|
|
|
self.is_nsfw = blog['is_nsfw'] |
107
|
|
|
self.likes = blog.get('likes', False) # Returned only if sharing of likes is enabled |
108
|
|
|
self.post_count = blog['posts'] |
109
|
|
|
self.updated = blog['updated'] |
110
|
|
|
|
111
|
|
|
posts = self._api_response.json()['response']['posts'] |
112
|
|
|
|
113
|
|
|
for post in posts: |
114
|
|
|
try: |
115
|
|
|
if post['type'] in ['photo', 'link']: |
116
|
|
|
self._posts.append(TumblrPhotoSet(post, self)) |
117
|
|
|
continue |
118
|
|
|
elif post['type'] == 'video': |
119
|
|
|
self._posts.append(TumblrVideoPost(post, self)) |
120
|
|
|
continue |
121
|
|
|
|
122
|
|
|
self._posts.append(TumblrPost(post, self)) |
123
|
|
|
except TumdlrParserError: |
124
|
|
|
continue |
125
|
|
|
|
126
|
|
|
def posts(self): |
127
|
|
|
""" |
128
|
|
|
Yields: |
129
|
|
|
TumblrPost |
130
|
|
|
""" |
131
|
|
|
while True: |
132
|
|
|
# Out of posts? |
133
|
|
|
if not self._posts: |
134
|
|
|
# Do we have any more to query? |
135
|
|
|
self._api_get() |
136
|
|
|
|
137
|
|
|
if not self._posts: |
138
|
|
|
# Nope, we've queried everything, break now |
139
|
|
|
break |
140
|
|
|
|
141
|
|
|
# Pop our next post and increment the offset |
142
|
|
|
post = self._posts.pop(0) |
143
|
|
|
self.offset += 1 |
144
|
|
|
|
145
|
|
|
yield post |
146
|
|
|
|