1
|
|
|
import urllib |
2
|
|
|
|
3
|
|
|
from requests import Session |
4
|
|
|
from yurl import URL |
5
|
|
|
|
6
|
|
|
from tumdlr import __version__ |
7
|
|
|
from tumdlr.containers import TumblrPost, TumblrPhotoSet, TumblrVideoPost |
8
|
|
|
from tumdlr.errors import TumdlrParserError |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class TumblrBlog: |
12
|
|
|
|
13
|
|
|
def __init__(self, url, session=None, **kwargs): |
14
|
|
|
""" |
15
|
|
|
Tumblr blog |
16
|
|
|
|
17
|
|
|
Args: |
18
|
|
|
url(URL|str): Tumblr profile URL |
19
|
|
|
session(Optional[Session]): An optional custom Requests session |
20
|
|
|
|
21
|
|
|
Keyword Args: |
22
|
|
|
api_key(str): Tumblr API key |
23
|
|
|
uagent(str): Custom User-Agent header |
24
|
|
|
""" |
25
|
|
|
self._url = url if isinstance(url, URL) else URL(url) |
26
|
|
|
self._api_url = URL(scheme='https', host='api.tumblr.com', path='/v2/') |
27
|
|
|
self._api_response = None # type: Response |
28
|
|
|
self._api_key = kwargs.get('api_key', 'fuiKNFp9vQFvjLNvx4sUwti4Yb5yGutBN4Xh10LXZhhRKjWlV4') |
29
|
|
|
self._uagent = kwargs.get('user_agent', 'tumdlr/{version}') |
30
|
|
|
|
31
|
|
|
if not session: |
32
|
|
|
session = Session() |
33
|
|
|
session.headers.update({ |
34
|
|
|
'Referer': urllib.parse.quote(self._url.as_string()), |
35
|
|
|
'User-Agent': self._uagent.format(version=__version__) |
36
|
|
|
}) |
37
|
|
|
|
38
|
|
|
self.session = session |
39
|
|
|
|
40
|
|
|
self.title = None # type: str |
41
|
|
|
self.url = None # type: URL |
42
|
|
|
self.name = None # type: str |
43
|
|
|
self.description = None # type: str |
44
|
|
|
self.is_nsfw = None # type: bool |
45
|
|
|
self.likes = None # type: int|False |
46
|
|
|
self.post_count = None # type: int |
47
|
|
|
self.updated = None # type: int |
48
|
|
|
|
49
|
|
|
self._posts = [] |
50
|
|
|
self.offset = 0 |
51
|
|
|
|
52
|
|
|
self._api_url = self._api_url.replace( |
53
|
|
|
path=self._api_url.path + 'blog/{host}/posts'.format(host=self._url.host) |
54
|
|
|
) |
55
|
|
|
self._api_get() |
56
|
|
|
|
57
|
|
|
def _api_get(self, query=None, parse=True): |
58
|
|
|
""" |
59
|
|
|
Execute an API query |
60
|
|
|
|
61
|
|
|
Args: |
62
|
|
|
query(Optional[dict]): Extra query parameters |
63
|
|
|
parse(Optional[bool]): Parse the API response immediately |
64
|
|
|
""" |
65
|
|
|
# Parse extra query parameters |
66
|
|
|
query_extra = [] |
67
|
|
|
|
68
|
|
|
if query: |
69
|
|
|
for key, value in query.items(): |
70
|
|
|
query_extra.append( |
71
|
|
|
'{key}={value}'.format( |
72
|
|
|
key=urllib.parse.quote(key), |
73
|
|
|
value=urllib.parse.quote(value) |
74
|
|
|
) |
75
|
|
|
) |
76
|
|
|
|
77
|
|
|
# Only prepend an ampersand if we have extra attributes, otherwise default to an empty string |
78
|
|
|
if query_extra: |
79
|
|
|
query_extra = '&' + '&'.join(query_extra) |
80
|
|
|
else: |
81
|
|
|
query_extra = '' |
82
|
|
|
|
83
|
|
|
endpoint = self._api_url.replace( |
84
|
|
|
query='api_key={api_key}&filter=text&offset={offset}{extra}'.format( |
85
|
|
|
api_key=self._api_key, offset=self.offset, extra=query_extra |
86
|
|
|
) |
87
|
|
|
) |
88
|
|
|
|
89
|
|
|
response = self.session.get(endpoint.as_string()) # type: Response |
90
|
|
|
response.raise_for_status() |
91
|
|
|
|
92
|
|
|
self._api_response = response |
93
|
|
|
if parse: |
94
|
|
|
self._api_parse_response() |
95
|
|
|
|
96
|
|
|
def _api_parse_response(self): |
97
|
|
|
""" |
98
|
|
|
Parse an API response |
99
|
|
|
|
100
|
|
|
""" |
101
|
|
|
blog = self._api_response.json()['response']['blog'] |
102
|
|
|
|
103
|
|
|
self.title = blog['title'] |
104
|
|
|
self.url = URL(blog['url']) |
105
|
|
|
self.name = blog['name'] |
106
|
|
|
self.description = blog['description'] |
107
|
|
|
self.is_nsfw = blog['is_nsfw'] |
108
|
|
|
self.likes = blog.get('likes', False) # Returned only if sharing of likes is enabled |
109
|
|
|
self.post_count = blog['posts'] |
110
|
|
|
self.updated = blog['updated'] |
111
|
|
|
|
112
|
|
|
posts = self._api_response.json()['response']['posts'] |
113
|
|
|
|
114
|
|
|
for post in posts: |
115
|
|
|
try: |
116
|
|
|
if post['type'] in ['photo', 'link']: |
117
|
|
|
self._posts.append(TumblrPhotoSet(post, self)) |
118
|
|
|
continue |
119
|
|
|
elif post['type'] == 'video': |
120
|
|
|
self._posts.append(TumblrVideoPost(post, self)) |
121
|
|
|
continue |
122
|
|
|
|
123
|
|
|
self._posts.append(TumblrPost(post, self)) |
124
|
|
|
except TumdlrParserError: |
125
|
|
|
continue |
126
|
|
|
|
127
|
|
|
def posts(self): |
128
|
|
|
""" |
129
|
|
|
Yields: |
130
|
|
|
TumblrPost |
131
|
|
|
""" |
132
|
|
|
while True: |
133
|
|
|
# Out of posts? |
134
|
|
|
if not self._posts: |
135
|
|
|
# Do we have any more to query? |
136
|
|
|
self._api_get() |
137
|
|
|
|
138
|
|
|
if not self._posts: |
139
|
|
|
# Nope, we've queried everything, break now |
140
|
|
|
break |
141
|
|
|
|
142
|
|
|
# Pop our next post and increment the offset |
143
|
|
|
post = self._posts.pop(0) |
144
|
|
|
self.offset += 1 |
145
|
|
|
|
146
|
|
|
yield post |
147
|
|
|
|