|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- encoding: utf-8 -*- |
|
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
|
4
|
|
|
# Author: Binux<[email protected]> |
|
5
|
|
|
# http://binux.me |
|
6
|
|
|
# Created on 2014-02-16 23:12:48 |
|
7
|
|
|
|
|
8
|
|
|
import sys |
|
9
|
|
|
import inspect |
|
10
|
|
|
import functools |
|
11
|
|
|
import fractions |
|
12
|
|
|
|
|
13
|
|
|
import six |
|
14
|
|
|
from six import add_metaclass, iteritems |
|
15
|
|
|
|
|
16
|
|
|
from pyspider.libs.url import ( |
|
17
|
|
|
quote_chinese, _build_url, _encode_params, |
|
18
|
|
|
_encode_multipart_formdata, curl_to_arguments) |
|
19
|
|
|
from pyspider.libs.utils import md5string, timeout |
|
20
|
|
|
from pyspider.libs.ListIO import ListO |
|
21
|
|
|
from pyspider.libs.response import rebuild_response |
|
22
|
|
|
from pyspider.libs.pprint import pprint |
|
23
|
|
|
from pyspider.processor import ProcessorResult |
|
24
|
|
|
|
|
25
|
|
|
|
|
26
|
|
|
def catch_status_code_error(func): |
|
27
|
|
|
""" |
|
28
|
|
|
Non-200 response will been regarded as fetch failed and will not pass to callback. |
|
29
|
|
|
Use this decorator to override this feature. |
|
30
|
|
|
""" |
|
31
|
|
|
func._catch_status_code_error = True |
|
32
|
|
|
return func |
|
33
|
|
|
|
|
34
|
|
|
|
|
35
|
|
|
def not_send_status(func): |
|
36
|
|
|
""" |
|
37
|
|
|
Do not send process status package back to scheduler. |
|
38
|
|
|
|
|
39
|
|
|
It's used by callbacks like on_message, on_result etc... |
|
40
|
|
|
""" |
|
41
|
|
|
@functools.wraps(func) |
|
42
|
|
|
def wrapper(self, response, task): |
|
43
|
|
|
self._extinfo['not_send_status'] = True |
|
44
|
|
|
function = func.__get__(self, self.__class__) |
|
45
|
|
|
return self._run_func(function, response, task) |
|
46
|
|
|
return wrapper |
|
47
|
|
|
|
|
48
|
|
|
|
|
49
|
|
|
def config(_config=None, **kwargs): |
|
50
|
|
|
""" |
|
51
|
|
|
A decorator for setting the default kwargs of `BaseHandler.crawl`. |
|
52
|
|
|
Any self.crawl with this callback will use this config. |
|
53
|
|
|
""" |
|
54
|
|
|
if _config is None: |
|
55
|
|
|
_config = {} |
|
56
|
|
|
_config.update(kwargs) |
|
57
|
|
|
|
|
58
|
|
|
def wrapper(func): |
|
59
|
|
|
func._config = _config |
|
60
|
|
|
return func |
|
61
|
|
|
return wrapper |
|
62
|
|
|
|
|
63
|
|
|
|
|
64
|
|
|
class NOTSET(object): |
|
65
|
|
|
pass |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
def every(minutes=NOTSET, seconds=NOTSET): |
|
69
|
|
|
""" |
|
70
|
|
|
method will been called every minutes or seconds |
|
71
|
|
|
""" |
|
72
|
|
|
def wrapper(func): |
|
73
|
|
|
# mark the function with variable 'is_cronjob=True', the function would be |
|
74
|
|
|
# collected into the list Handler._cron_jobs by meta class |
|
75
|
|
|
func.is_cronjob = True |
|
76
|
|
|
|
|
77
|
|
|
# collect interval and unify to seconds, it's used in meta class. See the |
|
78
|
|
|
# comments in meta class. |
|
79
|
|
|
func.tick = minutes * 60 + seconds |
|
80
|
|
|
return func |
|
81
|
|
|
|
|
82
|
|
|
if inspect.isfunction(minutes): |
|
83
|
|
|
func = minutes |
|
84
|
|
|
minutes = 1 |
|
85
|
|
|
seconds = 0 |
|
86
|
|
|
return wrapper(func) |
|
87
|
|
|
|
|
88
|
|
|
if minutes is NOTSET: |
|
89
|
|
|
if seconds is NOTSET: |
|
90
|
|
|
minutes = 1 |
|
91
|
|
|
seconds = 0 |
|
92
|
|
|
else: |
|
93
|
|
|
minutes = 0 |
|
94
|
|
|
if seconds is NOTSET: |
|
95
|
|
|
seconds = 0 |
|
96
|
|
|
|
|
97
|
|
|
return wrapper |
|
98
|
|
|
|
|
99
|
|
|
|
|
100
|
|
|
class BaseHandlerMeta(type): |
|
101
|
|
|
|
|
102
|
|
|
def __new__(cls, name, bases, attrs): |
|
103
|
|
|
# A list of all functions which is marked as 'is_cronjob=True' |
|
104
|
|
|
cron_jobs = [] |
|
105
|
|
|
|
|
106
|
|
|
# The min_tick is the greatest common divisor(GCD) of the interval of cronjobs |
|
107
|
|
|
# this value would be queried by scheduler when the project initial loaded. |
|
108
|
|
|
# Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce |
|
109
|
|
|
# the number of tasks sent from scheduler. |
|
110
|
|
|
min_tick = 0 |
|
111
|
|
|
|
|
112
|
|
|
for each in attrs.values(): |
|
113
|
|
|
if inspect.isfunction(each) and getattr(each, 'is_cronjob', False): |
|
114
|
|
|
cron_jobs.append(each) |
|
115
|
|
|
min_tick = fractions.gcd(min_tick, each.tick) |
|
116
|
|
|
newcls = type.__new__(cls, name, bases, attrs) |
|
117
|
|
|
newcls._cron_jobs = cron_jobs |
|
118
|
|
|
newcls._min_tick = min_tick |
|
119
|
|
|
return newcls |
|
120
|
|
|
|
|
121
|
|
|
|
|
122
|
|
|
@add_metaclass(BaseHandlerMeta) |
|
123
|
|
|
class BaseHandler(object): |
|
124
|
|
|
""" |
|
125
|
|
|
BaseHandler for all scripts. |
|
126
|
|
|
|
|
127
|
|
|
`BaseHandler.run` is the main method to handler the task. |
|
128
|
|
|
""" |
|
129
|
|
|
crawl_config = {} |
|
130
|
|
|
project_name = None |
|
131
|
|
|
_cron_jobs = [] |
|
132
|
|
|
_min_tick = 0 |
|
133
|
|
|
__env__ = {'not_inited': True} |
|
134
|
|
|
retry_delay = {} |
|
135
|
|
|
|
|
136
|
|
|
def _reset(self): |
|
137
|
|
|
""" |
|
138
|
|
|
reset before each task |
|
139
|
|
|
""" |
|
140
|
|
|
self._extinfo = {} |
|
141
|
|
|
self._messages = [] |
|
142
|
|
|
self._follows = [] |
|
143
|
|
|
self._follows_keys = set() |
|
144
|
|
|
|
|
145
|
|
|
def _run_func(self, function, *arguments): |
|
146
|
|
|
""" |
|
147
|
|
|
Running callback function with requested number of arguments |
|
148
|
|
|
""" |
|
149
|
|
|
args, varargs, keywords, defaults = inspect.getargspec(function) |
|
150
|
|
|
task = arguments[-1] |
|
151
|
|
|
process_time_limit = task['process'].get('process_time_limit', |
|
152
|
|
|
self.__env__.get('process_time_limit', 0)) |
|
153
|
|
|
if process_time_limit > 0: |
|
154
|
|
|
with timeout(process_time_limit, 'process timeout'): |
|
155
|
|
|
ret = function(*arguments[:len(args) - 1]) |
|
156
|
|
|
else: |
|
157
|
|
|
ret = function(*arguments[:len(args) - 1]) |
|
158
|
|
|
return ret |
|
159
|
|
|
|
|
160
|
|
|
def _run_task(self, task, response): |
|
161
|
|
|
""" |
|
162
|
|
|
Finding callback specified by `task['callback']` |
|
163
|
|
|
raising status error for it if needed. |
|
164
|
|
|
""" |
|
165
|
|
|
process = task.get('process', {}) |
|
166
|
|
|
callback = process.get('callback', '__call__') |
|
167
|
|
|
if not hasattr(self, callback): |
|
168
|
|
|
raise NotImplementedError("self.%s() not implemented!" % callback) |
|
169
|
|
|
|
|
170
|
|
|
function = getattr(self, callback) |
|
171
|
|
|
# do not run_func when 304 |
|
172
|
|
|
if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False): |
|
173
|
|
|
return None |
|
174
|
|
|
if not getattr(function, '_catch_status_code_error', False): |
|
175
|
|
|
response.raise_for_status() |
|
176
|
|
|
return self._run_func(function, response, task) |
|
177
|
|
|
|
|
178
|
|
|
def run_task(self, module, task, response): |
|
179
|
|
|
""" |
|
180
|
|
|
Processing the task, catching exceptions and logs, return a `ProcessorResult` object |
|
181
|
|
|
""" |
|
182
|
|
|
self.logger = logger = module.logger |
|
183
|
|
|
result = None |
|
184
|
|
|
exception = None |
|
185
|
|
|
stdout = sys.stdout |
|
186
|
|
|
self.task = task |
|
187
|
|
|
if isinstance(response, dict): |
|
188
|
|
|
response = rebuild_response(response) |
|
189
|
|
|
self.response = response |
|
190
|
|
|
self.save = (task.get('track') or {}).get('save', {}) |
|
191
|
|
|
|
|
192
|
|
|
try: |
|
193
|
|
|
if self.__env__.get('enable_stdout_capture', True): |
|
194
|
|
|
sys.stdout = ListO(module.log_buffer) |
|
195
|
|
|
self._reset() |
|
196
|
|
|
result = self._run_task(task, response) |
|
197
|
|
|
if inspect.isgenerator(result): |
|
198
|
|
|
for r in result: |
|
199
|
|
|
self._run_func(self.on_result, r, response, task) |
|
200
|
|
|
else: |
|
201
|
|
|
self._run_func(self.on_result, result, response, task) |
|
202
|
|
|
except Exception as e: |
|
203
|
|
|
logger.exception(e) |
|
204
|
|
|
exception = e |
|
205
|
|
|
finally: |
|
206
|
|
|
follows = self._follows |
|
207
|
|
|
messages = self._messages |
|
208
|
|
|
logs = list(module.log_buffer) |
|
209
|
|
|
extinfo = self._extinfo |
|
210
|
|
|
save = self.save |
|
211
|
|
|
|
|
212
|
|
|
sys.stdout = stdout |
|
213
|
|
|
self.task = None |
|
214
|
|
|
self.response = None |
|
215
|
|
|
self.save = None |
|
216
|
|
|
|
|
217
|
|
|
module.log_buffer[:] = [] |
|
218
|
|
|
return ProcessorResult(result, follows, messages, logs, exception, extinfo, save) |
|
219
|
|
|
|
|
220
|
|
|
schedule_fields = ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update', 'auto_recrawl', 'cancel') |
|
221
|
|
|
fetch_fields = ('method', 'headers', 'user_agent', 'data', 'connect_timeout', 'timeout', 'allow_redirects', 'cookies', |
|
222
|
|
|
'proxy', 'etag', 'last_modifed', 'last_modified', 'save', 'js_run_at', 'js_script', |
|
223
|
|
|
'js_viewport_width', 'js_viewport_height', 'load_images', 'fetch_type', 'use_gzip', 'validate_cert', |
|
224
|
|
|
'max_redirects', 'robots_txt') |
|
225
|
|
|
process_fields = ('callback', 'process_time_limit') |
|
226
|
|
|
|
|
227
|
|
|
@staticmethod |
|
228
|
|
|
def task_join_crawl_config(task, crawl_config): |
|
229
|
|
|
task_fetch = task.get('fetch', {}) |
|
230
|
|
|
for k in BaseHandler.fetch_fields: |
|
231
|
|
|
if k in crawl_config: |
|
232
|
|
|
v = crawl_config[k] |
|
233
|
|
|
if isinstance(v, dict) and isinstance(task_fetch.get(k), dict): |
|
234
|
|
|
v = dict(v) |
|
235
|
|
|
v.update(task_fetch[k]) |
|
236
|
|
|
task_fetch[k] = v |
|
237
|
|
|
else: |
|
238
|
|
|
task_fetch.setdefault(k, v) |
|
239
|
|
|
if task_fetch: |
|
240
|
|
|
task['fetch'] = task_fetch |
|
241
|
|
|
|
|
242
|
|
|
task_process = task.get('process', {}) |
|
243
|
|
|
for k in BaseHandler.process_fields: |
|
244
|
|
|
if k in crawl_config: |
|
245
|
|
|
v = crawl_config[k] |
|
246
|
|
|
if isinstance(v, dict) and isinstance(task_process.get(k), dict): |
|
247
|
|
|
task_process[k].update(v) |
|
248
|
|
|
else: |
|
249
|
|
|
task_process.setdefault(k, v) |
|
250
|
|
|
if task_process: |
|
251
|
|
|
task['process'] = task_process |
|
252
|
|
|
|
|
253
|
|
|
return task |
|
254
|
|
|
|
|
255
|
|
|
def _crawl(self, url, **kwargs): |
|
256
|
|
|
""" |
|
257
|
|
|
real crawl API |
|
258
|
|
|
|
|
259
|
|
|
checking kwargs, and repack them to each sub-dict |
|
260
|
|
|
""" |
|
261
|
|
|
task = {} |
|
262
|
|
|
|
|
263
|
|
|
assert len(url) < 1024, "Maximum (1024) URL length error." |
|
264
|
|
|
|
|
265
|
|
|
if kwargs.get('callback'): |
|
266
|
|
|
callback = kwargs['callback'] |
|
267
|
|
|
if isinstance(callback, six.string_types) and hasattr(self, callback): |
|
268
|
|
|
func = getattr(self, callback) |
|
269
|
|
|
elif six.callable(callback) and six.get_method_self(callback) is self: |
|
270
|
|
|
func = callback |
|
271
|
|
|
kwargs['callback'] = func.__name__ |
|
272
|
|
|
else: |
|
273
|
|
|
raise NotImplementedError("self.%s() not implemented!" % callback) |
|
274
|
|
|
if hasattr(func, '_config'): |
|
275
|
|
|
for k, v in iteritems(func._config): |
|
276
|
|
|
if isinstance(v, dict) and isinstance(kwargs.get(k), dict): |
|
277
|
|
|
kwargs[k].update(v) |
|
278
|
|
|
else: |
|
279
|
|
|
kwargs.setdefault(k, v) |
|
280
|
|
|
|
|
281
|
|
|
url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None))) |
|
282
|
|
|
if kwargs.get('files'): |
|
283
|
|
|
assert isinstance( |
|
284
|
|
|
kwargs.get('data', {}), dict), "data must be a dict when using with files!" |
|
285
|
|
|
content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}), |
|
286
|
|
|
kwargs.pop('files', {})) |
|
287
|
|
|
kwargs.setdefault('headers', {}) |
|
288
|
|
|
kwargs['headers']['Content-Type'] = content_type |
|
289
|
|
|
kwargs['data'] = data |
|
290
|
|
|
if kwargs.get('data'): |
|
291
|
|
|
kwargs['data'] = _encode_params(kwargs['data']) |
|
292
|
|
|
if kwargs.get('data'): |
|
293
|
|
|
kwargs.setdefault('method', 'POST') |
|
294
|
|
|
|
|
295
|
|
|
if kwargs.get('user_agent'): |
|
296
|
|
|
kwargs.setdefault('headers', {}) |
|
297
|
|
|
kwargs['headers']['User-Agent'] = kwargs.get('user_agent') |
|
298
|
|
|
|
|
299
|
|
|
schedule = {} |
|
300
|
|
|
for key in self.schedule_fields: |
|
301
|
|
|
if key in kwargs: |
|
302
|
|
|
schedule[key] = kwargs.pop(key) |
|
303
|
|
|
elif key in self.crawl_config: |
|
304
|
|
|
schedule[key] = self.crawl_config[key] |
|
305
|
|
|
|
|
306
|
|
|
task['schedule'] = schedule |
|
307
|
|
|
|
|
308
|
|
|
fetch = {} |
|
309
|
|
|
for key in self.fetch_fields: |
|
310
|
|
|
if key in kwargs: |
|
311
|
|
|
fetch[key] = kwargs.pop(key) |
|
312
|
|
|
task['fetch'] = fetch |
|
313
|
|
|
|
|
314
|
|
|
process = {} |
|
315
|
|
|
for key in self.process_fields: |
|
316
|
|
|
if key in kwargs: |
|
317
|
|
|
process[key] = kwargs.pop(key) |
|
318
|
|
|
task['process'] = process |
|
319
|
|
|
|
|
320
|
|
|
task['project'] = self.project_name |
|
321
|
|
|
task['url'] = url |
|
322
|
|
|
if 'taskid' in kwargs: |
|
323
|
|
|
task['taskid'] = kwargs.pop('taskid') |
|
324
|
|
|
else: |
|
325
|
|
|
task['taskid'] = self.get_taskid(task) |
|
326
|
|
|
|
|
327
|
|
|
if kwargs: |
|
328
|
|
|
raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys()) |
|
329
|
|
|
|
|
330
|
|
|
if self.is_debugger(): |
|
331
|
|
|
task = self.task_join_crawl_config(task, self.crawl_config) |
|
332
|
|
|
|
|
333
|
|
|
cache_key = "%(project)s:%(taskid)s" % task |
|
334
|
|
|
if cache_key not in self._follows_keys: |
|
335
|
|
|
self._follows_keys.add(cache_key) |
|
336
|
|
|
self._follows.append(task) |
|
337
|
|
|
return task |
|
338
|
|
|
|
|
339
|
|
|
def get_taskid(self, task): |
|
340
|
|
|
'''Generate taskid by information of task md5(url) by default, override me''' |
|
341
|
|
|
return md5string(task['url']) |
|
342
|
|
|
|
|
343
|
|
|
# apis |
|
344
|
|
|
def crawl(self, url, **kwargs): |
|
345
|
|
|
''' |
|
346
|
|
|
available params: |
|
347
|
|
|
url |
|
348
|
|
|
callback |
|
349
|
|
|
|
|
350
|
|
|
method |
|
351
|
|
|
params |
|
352
|
|
|
data |
|
353
|
|
|
files |
|
354
|
|
|
headers |
|
355
|
|
|
timeout |
|
356
|
|
|
allow_redirects |
|
357
|
|
|
cookies |
|
358
|
|
|
proxy |
|
359
|
|
|
etag |
|
360
|
|
|
last_modified |
|
361
|
|
|
auto_recrawl |
|
362
|
|
|
|
|
363
|
|
|
fetch_type |
|
364
|
|
|
js_run_at |
|
365
|
|
|
js_script |
|
366
|
|
|
js_viewport_width |
|
367
|
|
|
js_viewport_height |
|
368
|
|
|
load_images |
|
369
|
|
|
|
|
370
|
|
|
priority |
|
371
|
|
|
retries |
|
372
|
|
|
exetime |
|
373
|
|
|
age |
|
374
|
|
|
itag |
|
375
|
|
|
cancel |
|
376
|
|
|
|
|
377
|
|
|
save |
|
378
|
|
|
taskid |
|
379
|
|
|
|
|
380
|
|
|
full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/ |
|
381
|
|
|
''' |
|
382
|
|
|
|
|
383
|
|
|
if isinstance(url, six.string_types) and url.startswith('curl '): |
|
384
|
|
|
curl_kwargs = curl_to_arguments(url) |
|
385
|
|
|
url = curl_kwargs.pop('urls') |
|
386
|
|
|
for k, v in iteritems(curl_kwargs): |
|
387
|
|
|
kwargs.setdefault(k, v) |
|
388
|
|
|
|
|
389
|
|
|
if isinstance(url, six.string_types): |
|
390
|
|
|
return self._crawl(url, **kwargs) |
|
391
|
|
|
elif hasattr(url, "__iter__"): |
|
392
|
|
|
result = [] |
|
393
|
|
|
for each in url: |
|
394
|
|
|
result.append(self._crawl(each, **kwargs)) |
|
395
|
|
|
return result |
|
396
|
|
|
|
|
397
|
|
|
def is_debugger(self): |
|
398
|
|
|
"""Return true if running in debugger""" |
|
399
|
|
|
return self.__env__.get('debugger') |
|
400
|
|
|
|
|
401
|
|
|
def send_message(self, project, msg, url='data:,on_message'): |
|
402
|
|
|
"""Send messages to other project.""" |
|
403
|
|
|
self._messages.append((project, msg, url)) |
|
404
|
|
|
|
|
405
|
|
|
def on_message(self, project, msg): |
|
406
|
|
|
"""Receive message from other project, override me.""" |
|
407
|
|
|
pass |
|
408
|
|
|
|
|
409
|
|
|
def on_result(self, result): |
|
410
|
|
|
"""Receiving returns from other callback, override me.""" |
|
411
|
|
|
if not result: |
|
412
|
|
|
return |
|
413
|
|
|
assert self.task, "on_result can't outside a callback." |
|
414
|
|
|
if self.is_debugger(): |
|
415
|
|
|
pprint(result) |
|
416
|
|
|
if self.__env__.get('result_queue'): |
|
417
|
|
|
self.__env__['result_queue'].put((self.task, result)) |
|
418
|
|
|
|
|
419
|
|
|
def on_finished(self, response, task): |
|
420
|
|
|
""" |
|
421
|
|
|
Triggered when all tasks in task queue finished. |
|
422
|
|
|
http://docs.pyspider.org/en/latest/About-Projects/#on_finished-callback |
|
423
|
|
|
""" |
|
424
|
|
|
pass |
|
425
|
|
|
|
|
426
|
|
|
@not_send_status |
|
427
|
|
|
def _on_message(self, response): |
|
428
|
|
|
project, msg = response.save |
|
429
|
|
|
return self.on_message(project, msg) |
|
430
|
|
|
|
|
431
|
|
|
@not_send_status |
|
432
|
|
|
def _on_cronjob(self, response, task): |
|
433
|
|
|
if (not response.save |
|
434
|
|
|
or not isinstance(response.save, dict) |
|
435
|
|
|
or 'tick' not in response.save): |
|
436
|
|
|
return |
|
437
|
|
|
|
|
438
|
|
|
# When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in |
|
439
|
|
|
# Response.save. Scheduler may at least send the trigger task every GCD of the |
|
440
|
|
|
# inverval of the cronjobs. The method should check the tick for each cronjob |
|
441
|
|
|
# function to confirm the execute interval. |
|
442
|
|
|
for cronjob in self._cron_jobs: |
|
443
|
|
|
if response.save['tick'] % cronjob.tick != 0: |
|
444
|
|
|
continue |
|
445
|
|
|
function = cronjob.__get__(self, self.__class__) |
|
446
|
|
|
self._run_func(function, response, task) |
|
447
|
|
|
|
|
448
|
|
|
def _on_get_info(self, response, task): |
|
449
|
|
|
"""Sending runtime infomation about this script.""" |
|
450
|
|
|
for each in response.save or []: |
|
451
|
|
|
if each == 'min_tick': |
|
452
|
|
|
self.save[each] = self._min_tick |
|
453
|
|
|
elif each == 'retry_delay': |
|
454
|
|
|
if not isinstance(self.retry_delay, dict): |
|
455
|
|
|
self.retry_delay = {'': self.retry_delay} |
|
456
|
|
|
self.save[each] = self.retry_delay |
|
457
|
|
|
elif each == 'crawl_config': |
|
458
|
|
|
self.save[each] = self.crawl_config |
|
459
|
|
|
|