1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- encoding: utf-8 -*- |
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
4
|
|
|
# Author: Binux<[email protected]> |
5
|
|
|
# http://binux.me |
6
|
|
|
# Created on 2014-02-16 23:12:48 |
7
|
|
|
|
8
|
|
|
import sys |
9
|
|
|
import inspect |
10
|
|
|
import functools |
11
|
|
|
import fractions |
12
|
|
|
|
13
|
|
|
import six |
14
|
|
|
from six import add_metaclass, iteritems |
15
|
|
|
|
16
|
|
|
from pyspider.libs.url import ( |
17
|
|
|
quote_chinese, _build_url, _encode_params, |
18
|
|
|
_encode_multipart_formdata, curl_to_arguments) |
19
|
|
|
from pyspider.libs.utils import md5string, timeout |
20
|
|
|
from pyspider.libs.ListIO import ListO |
21
|
|
|
from pyspider.libs.response import rebuild_response |
22
|
|
|
from pyspider.libs.pprint import pprint |
23
|
|
|
from pyspider.processor import ProcessorResult |
24
|
|
|
|
25
|
|
|
|
26
|
|
|
def catch_status_code_error(func): |
27
|
|
|
""" |
28
|
|
|
Non-200 response will been regarded as fetch failed and will not pass to callback. |
29
|
|
|
Use this decorator to override this feature. |
30
|
|
|
""" |
31
|
|
|
func._catch_status_code_error = True |
32
|
|
|
return func |
33
|
|
|
|
34
|
|
|
|
35
|
|
|
def not_send_status(func): |
36
|
|
|
""" |
37
|
|
|
Do not send process status package back to scheduler. |
38
|
|
|
|
39
|
|
|
It's used by callbacks like on_message, on_result etc... |
40
|
|
|
""" |
41
|
|
|
@functools.wraps(func) |
42
|
|
|
def wrapper(self, response, task): |
43
|
|
|
self._extinfo['not_send_status'] = True |
44
|
|
|
function = func.__get__(self, self.__class__) |
45
|
|
|
return self._run_func(function, response, task) |
46
|
|
|
return wrapper |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
def config(_config=None, **kwargs): |
50
|
|
|
""" |
51
|
|
|
A decorator for setting the default kwargs of `BaseHandler.crawl`. |
52
|
|
|
Any self.crawl with this callback will use this config. |
53
|
|
|
""" |
54
|
|
|
if _config is None: |
55
|
|
|
_config = {} |
56
|
|
|
_config.update(kwargs) |
57
|
|
|
|
58
|
|
|
def wrapper(func): |
59
|
|
|
func._config = _config |
60
|
|
|
return func |
61
|
|
|
return wrapper |
62
|
|
|
|
63
|
|
|
|
64
|
|
|
class NOTSET(object): |
65
|
|
|
pass |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
def every(minutes=NOTSET, seconds=NOTSET): |
69
|
|
|
""" |
70
|
|
|
method will been called every minutes or seconds |
71
|
|
|
""" |
72
|
|
|
def wrapper(func): |
73
|
|
|
# mark the function with variable 'is_cronjob=True', the function would be |
74
|
|
|
# collected into the list Handler._cron_jobs by meta class |
75
|
|
|
func.is_cronjob = True |
76
|
|
|
|
77
|
|
|
# collect interval and unify to seconds, it's used in meta class. See the |
78
|
|
|
# comments in meta class. |
79
|
|
|
func.tick = minutes * 60 + seconds |
80
|
|
|
return func |
81
|
|
|
|
82
|
|
|
if inspect.isfunction(minutes): |
83
|
|
|
func = minutes |
84
|
|
|
minutes = 1 |
85
|
|
|
seconds = 0 |
86
|
|
|
return wrapper(func) |
87
|
|
|
|
88
|
|
|
if minutes is NOTSET: |
89
|
|
|
if seconds is NOTSET: |
90
|
|
|
minutes = 1 |
91
|
|
|
seconds = 0 |
92
|
|
|
else: |
93
|
|
|
minutes = 0 |
94
|
|
|
if seconds is NOTSET: |
95
|
|
|
seconds = 0 |
96
|
|
|
|
97
|
|
|
return wrapper |
98
|
|
|
|
99
|
|
|
|
100
|
|
|
class BaseHandlerMeta(type): |
101
|
|
|
|
102
|
|
|
def __new__(cls, name, bases, attrs): |
103
|
|
|
# A list of all functions which is marked as 'is_cronjob=True' |
104
|
|
|
cron_jobs = [] |
105
|
|
|
|
106
|
|
|
# The min_tick is the greatest common divisor(GCD) of the interval of cronjobs |
107
|
|
|
# this value would be queried by scheduler when the project initial loaded. |
108
|
|
|
# Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce |
109
|
|
|
# the number of tasks sent from scheduler. |
110
|
|
|
min_tick = 0 |
111
|
|
|
|
112
|
|
|
for each in attrs.values(): |
113
|
|
|
if inspect.isfunction(each) and getattr(each, 'is_cronjob', False): |
114
|
|
|
cron_jobs.append(each) |
115
|
|
|
min_tick = fractions.gcd(min_tick, each.tick) |
116
|
|
|
newcls = type.__new__(cls, name, bases, attrs) |
117
|
|
|
newcls._cron_jobs = cron_jobs |
118
|
|
|
newcls._min_tick = min_tick |
119
|
|
|
return newcls |
120
|
|
|
|
121
|
|
|
|
122
|
|
|
@add_metaclass(BaseHandlerMeta) |
123
|
|
|
class BaseHandler(object): |
124
|
|
|
""" |
125
|
|
|
BaseHandler for all scripts. |
126
|
|
|
|
127
|
|
|
`BaseHandler.run` is the main method to handler the task. |
128
|
|
|
""" |
129
|
|
|
crawl_config = {} |
130
|
|
|
project_name = None |
131
|
|
|
_cron_jobs = [] |
132
|
|
|
_min_tick = 0 |
133
|
|
|
__env__ = {'not_inited': True} |
134
|
|
|
retry_delay = {} |
135
|
|
|
|
136
|
|
|
def _reset(self): |
137
|
|
|
""" |
138
|
|
|
reset before each task |
139
|
|
|
""" |
140
|
|
|
self._extinfo = {} |
141
|
|
|
self._messages = [] |
142
|
|
|
self._follows = [] |
143
|
|
|
self._follows_keys = set() |
144
|
|
|
|
145
|
|
|
def _run_func(self, function, *arguments): |
146
|
|
|
""" |
147
|
|
|
Running callback function with requested number of arguments |
148
|
|
|
""" |
149
|
|
|
args, varargs, keywords, defaults = inspect.getargspec(function) |
150
|
|
|
task = arguments[-1] |
151
|
|
|
process_time_limit = task['process'].get('process_time_limit', |
152
|
|
|
self.__env__.get('process_time_limit', 0)) |
153
|
|
|
if process_time_limit > 0: |
154
|
|
|
with timeout(process_time_limit, 'process timeout'): |
155
|
|
|
ret = function(*arguments[:len(args) - 1]) |
156
|
|
|
else: |
157
|
|
|
ret = function(*arguments[:len(args) - 1]) |
158
|
|
|
return ret |
159
|
|
|
|
160
|
|
|
def _run_task(self, task, response): |
161
|
|
|
""" |
162
|
|
|
Finding callback specified by `task['callback']` |
163
|
|
|
raising status error for it if needed. |
164
|
|
|
""" |
165
|
|
|
process = task.get('process', {}) |
166
|
|
|
callback = process.get('callback', '__call__') |
167
|
|
|
if not hasattr(self, callback): |
168
|
|
|
raise NotImplementedError("self.%s() not implemented!" % callback) |
169
|
|
|
|
170
|
|
|
function = getattr(self, callback) |
171
|
|
|
# do not run_func when 304 |
172
|
|
|
if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False): |
173
|
|
|
return None |
174
|
|
|
if not getattr(function, '_catch_status_code_error', False): |
175
|
|
|
response.raise_for_status() |
176
|
|
|
return self._run_func(function, response, task) |
177
|
|
|
|
178
|
|
|
def run_task(self, module, task, response): |
179
|
|
|
""" |
180
|
|
|
Processing the task, catching exceptions and logs, return a `ProcessorResult` object |
181
|
|
|
""" |
182
|
|
|
self.logger = logger = module.logger |
183
|
|
|
result = None |
184
|
|
|
exception = None |
185
|
|
|
stdout = sys.stdout |
186
|
|
|
self.task = task |
187
|
|
|
if isinstance(response, dict): |
188
|
|
|
response = rebuild_response(response) |
189
|
|
|
self.response = response |
190
|
|
|
self.save = (task.get('track') or {}).get('save', {}) |
191
|
|
|
|
192
|
|
|
try: |
193
|
|
|
if self.__env__.get('enable_stdout_capture', True): |
194
|
|
|
sys.stdout = ListO(module.log_buffer) |
195
|
|
|
self._reset() |
196
|
|
|
result = self._run_task(task, response) |
197
|
|
|
if inspect.isgenerator(result): |
198
|
|
|
for r in result: |
199
|
|
|
self._run_func(self.on_result, r, response, task) |
200
|
|
|
else: |
201
|
|
|
self._run_func(self.on_result, result, response, task) |
202
|
|
|
except Exception as e: |
203
|
|
|
logger.exception(e) |
204
|
|
|
exception = e |
205
|
|
|
finally: |
206
|
|
|
follows = self._follows |
207
|
|
|
messages = self._messages |
208
|
|
|
logs = list(module.log_buffer) |
209
|
|
|
extinfo = self._extinfo |
210
|
|
|
save = self.save |
211
|
|
|
|
212
|
|
|
sys.stdout = stdout |
213
|
|
|
self.task = None |
214
|
|
|
self.response = None |
215
|
|
|
self.save = None |
216
|
|
|
|
217
|
|
|
module.log_buffer[:] = [] |
218
|
|
|
return ProcessorResult(result, follows, messages, logs, exception, extinfo, save) |
219
|
|
|
|
220
|
|
|
schedule_fields = ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update', 'auto_recrawl', 'cancel') |
221
|
|
|
fetch_fields = ('method', 'headers', 'user_agent', 'data', 'connect_timeout', 'timeout', 'allow_redirects', 'cookies', |
222
|
|
|
'proxy', 'etag', 'last_modifed', 'last_modified', 'save', 'js_run_at', 'js_script', |
223
|
|
|
'js_viewport_width', 'js_viewport_height', 'load_images', 'fetch_type', 'use_gzip', 'validate_cert', |
224
|
|
|
'max_redirects', 'robots_txt') |
225
|
|
|
process_fields = ('callback', 'process_time_limit') |
226
|
|
|
|
227
|
|
|
@staticmethod |
228
|
|
|
def task_join_crawl_config(task, crawl_config): |
229
|
|
|
task_fetch = task.get('fetch', {}) |
230
|
|
|
for k in BaseHandler.fetch_fields: |
231
|
|
|
if k in crawl_config: |
232
|
|
|
v = crawl_config[k] |
233
|
|
|
if isinstance(v, dict) and isinstance(task_fetch.get(k), dict): |
234
|
|
|
v = dict(v) |
235
|
|
|
v.update(task_fetch[k]) |
236
|
|
|
task_fetch[k] = v |
237
|
|
|
else: |
238
|
|
|
task_fetch.setdefault(k, v) |
239
|
|
|
if task_fetch: |
240
|
|
|
task['fetch'] = task_fetch |
241
|
|
|
|
242
|
|
|
task_process = task.get('process', {}) |
243
|
|
|
for k in BaseHandler.process_fields: |
244
|
|
|
if k in crawl_config: |
245
|
|
|
v = crawl_config[k] |
246
|
|
|
if isinstance(v, dict) and isinstance(task_process.get(k), dict): |
247
|
|
|
task_process[k].update(v) |
248
|
|
|
else: |
249
|
|
|
task_process.setdefault(k, v) |
250
|
|
|
if task_process: |
251
|
|
|
task['process'] = task_process |
252
|
|
|
|
253
|
|
|
return task |
254
|
|
|
|
255
|
|
|
def _crawl(self, url, **kwargs): |
256
|
|
|
""" |
257
|
|
|
real crawl API |
258
|
|
|
|
259
|
|
|
checking kwargs, and repack them to each sub-dict |
260
|
|
|
""" |
261
|
|
|
task = {} |
262
|
|
|
|
263
|
|
|
assert len(url) < 1024, "Maximum (1024) URL length error." |
264
|
|
|
|
265
|
|
|
if kwargs.get('callback'): |
266
|
|
|
callback = kwargs['callback'] |
267
|
|
|
if isinstance(callback, six.string_types) and hasattr(self, callback): |
268
|
|
|
func = getattr(self, callback) |
269
|
|
|
elif six.callable(callback) and six.get_method_self(callback) is self: |
270
|
|
|
func = callback |
271
|
|
|
kwargs['callback'] = func.__name__ |
272
|
|
|
else: |
273
|
|
|
raise NotImplementedError("self.%s() not implemented!" % callback) |
274
|
|
|
if hasattr(func, '_config'): |
275
|
|
|
for k, v in iteritems(func._config): |
276
|
|
|
if isinstance(v, dict) and isinstance(kwargs.get(k), dict): |
277
|
|
|
kwargs[k].update(v) |
278
|
|
|
else: |
279
|
|
|
kwargs.setdefault(k, v) |
280
|
|
|
|
281
|
|
|
url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None))) |
282
|
|
|
if kwargs.get('files'): |
283
|
|
|
assert isinstance( |
284
|
|
|
kwargs.get('data', {}), dict), "data must be a dict when using with files!" |
285
|
|
|
content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}), |
286
|
|
|
kwargs.pop('files', {})) |
287
|
|
|
kwargs.setdefault('headers', {}) |
288
|
|
|
kwargs['headers']['Content-Type'] = content_type |
289
|
|
|
kwargs['data'] = data |
290
|
|
|
if kwargs.get('data'): |
291
|
|
|
kwargs['data'] = _encode_params(kwargs['data']) |
292
|
|
|
if kwargs.get('data'): |
293
|
|
|
kwargs.setdefault('method', 'POST') |
294
|
|
|
|
295
|
|
|
if kwargs.get('user_agent'): |
296
|
|
|
kwargs.setdefault('headers', {}) |
297
|
|
|
kwargs['headers']['User-Agent'] = kwargs.get('user_agent') |
298
|
|
|
|
299
|
|
|
schedule = {} |
300
|
|
|
for key in self.schedule_fields: |
301
|
|
|
if key in kwargs: |
302
|
|
|
schedule[key] = kwargs.pop(key) |
303
|
|
|
elif key in self.crawl_config: |
304
|
|
|
schedule[key] = self.crawl_config[key] |
305
|
|
|
|
306
|
|
|
task['schedule'] = schedule |
307
|
|
|
|
308
|
|
|
fetch = {} |
309
|
|
|
for key in self.fetch_fields: |
310
|
|
|
if key in kwargs: |
311
|
|
|
fetch[key] = kwargs.pop(key) |
312
|
|
|
task['fetch'] = fetch |
313
|
|
|
|
314
|
|
|
process = {} |
315
|
|
|
for key in self.process_fields: |
316
|
|
|
if key in kwargs: |
317
|
|
|
process[key] = kwargs.pop(key) |
318
|
|
|
task['process'] = process |
319
|
|
|
|
320
|
|
|
task['project'] = self.project_name |
321
|
|
|
task['url'] = url |
322
|
|
|
if 'taskid' in kwargs: |
323
|
|
|
task['taskid'] = kwargs.pop('taskid') |
324
|
|
|
else: |
325
|
|
|
task['taskid'] = self.get_taskid(task) |
326
|
|
|
|
327
|
|
|
if kwargs: |
328
|
|
|
raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys()) |
329
|
|
|
|
330
|
|
|
if self.is_debugger(): |
331
|
|
|
task = self.task_join_crawl_config(task, self.crawl_config) |
332
|
|
|
|
333
|
|
|
cache_key = "%(project)s:%(taskid)s" % task |
334
|
|
|
if cache_key not in self._follows_keys: |
335
|
|
|
self._follows_keys.add(cache_key) |
336
|
|
|
self._follows.append(task) |
337
|
|
|
return task |
338
|
|
|
|
339
|
|
|
def get_taskid(self, task): |
340
|
|
|
'''Generate taskid by information of task md5(url) by default, override me''' |
341
|
|
|
return md5string(task['url']) |
342
|
|
|
|
343
|
|
|
# apis |
344
|
|
|
def crawl(self, url, **kwargs): |
345
|
|
|
''' |
346
|
|
|
available params: |
347
|
|
|
url |
348
|
|
|
callback |
349
|
|
|
|
350
|
|
|
method |
351
|
|
|
params |
352
|
|
|
data |
353
|
|
|
files |
354
|
|
|
headers |
355
|
|
|
timeout |
356
|
|
|
allow_redirects |
357
|
|
|
cookies |
358
|
|
|
proxy |
359
|
|
|
etag |
360
|
|
|
last_modified |
361
|
|
|
auto_recrawl |
362
|
|
|
|
363
|
|
|
fetch_type |
364
|
|
|
js_run_at |
365
|
|
|
js_script |
366
|
|
|
js_viewport_width |
367
|
|
|
js_viewport_height |
368
|
|
|
load_images |
369
|
|
|
|
370
|
|
|
priority |
371
|
|
|
retries |
372
|
|
|
exetime |
373
|
|
|
age |
374
|
|
|
itag |
375
|
|
|
cancel |
376
|
|
|
|
377
|
|
|
save |
378
|
|
|
taskid |
379
|
|
|
|
380
|
|
|
full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/ |
381
|
|
|
''' |
382
|
|
|
|
383
|
|
|
if isinstance(url, six.string_types) and url.startswith('curl '): |
384
|
|
|
curl_kwargs = curl_to_arguments(url) |
385
|
|
|
url = curl_kwargs.pop('urls') |
386
|
|
|
for k, v in iteritems(curl_kwargs): |
387
|
|
|
kwargs.setdefault(k, v) |
388
|
|
|
|
389
|
|
|
if isinstance(url, six.string_types): |
390
|
|
|
return self._crawl(url, **kwargs) |
391
|
|
|
elif hasattr(url, "__iter__"): |
392
|
|
|
result = [] |
393
|
|
|
for each in url: |
394
|
|
|
result.append(self._crawl(each, **kwargs)) |
395
|
|
|
return result |
396
|
|
|
|
397
|
|
|
def is_debugger(self): |
398
|
|
|
"""Return true if running in debugger""" |
399
|
|
|
return self.__env__.get('debugger') |
400
|
|
|
|
401
|
|
|
def send_message(self, project, msg, url='data:,on_message'): |
402
|
|
|
"""Send messages to other project.""" |
403
|
|
|
self._messages.append((project, msg, url)) |
404
|
|
|
|
405
|
|
|
def on_message(self, project, msg): |
406
|
|
|
"""Receive message from other project, override me.""" |
407
|
|
|
pass |
408
|
|
|
|
409
|
|
|
def on_result(self, result): |
410
|
|
|
"""Receiving returns from other callback, override me.""" |
411
|
|
|
if not result: |
412
|
|
|
return |
413
|
|
|
assert self.task, "on_result can't outside a callback." |
414
|
|
|
if self.is_debugger(): |
415
|
|
|
pprint(result) |
416
|
|
|
if self.__env__.get('result_queue'): |
417
|
|
|
self.__env__['result_queue'].put((self.task, result)) |
418
|
|
|
|
419
|
|
|
def on_finished(self, response, task): |
420
|
|
|
""" |
421
|
|
|
Triggered when all tasks in task queue finished. |
422
|
|
|
http://docs.pyspider.org/en/latest/About-Projects/#on_finished-callback |
423
|
|
|
""" |
424
|
|
|
pass |
425
|
|
|
|
426
|
|
|
@not_send_status |
427
|
|
|
def _on_message(self, response): |
428
|
|
|
project, msg = response.save |
429
|
|
|
return self.on_message(project, msg) |
430
|
|
|
|
431
|
|
|
@not_send_status |
432
|
|
|
def _on_cronjob(self, response, task): |
433
|
|
|
if (not response.save |
434
|
|
|
or not isinstance(response.save, dict) |
435
|
|
|
or 'tick' not in response.save): |
436
|
|
|
return |
437
|
|
|
|
438
|
|
|
# When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in |
439
|
|
|
# Response.save. Scheduler may at least send the trigger task every GCD of the |
440
|
|
|
# inverval of the cronjobs. The method should check the tick for each cronjob |
441
|
|
|
# function to confirm the execute interval. |
442
|
|
|
for cronjob in self._cron_jobs: |
443
|
|
|
if response.save['tick'] % cronjob.tick != 0: |
444
|
|
|
continue |
445
|
|
|
function = cronjob.__get__(self, self.__class__) |
446
|
|
|
self._run_func(function, response, task) |
447
|
|
|
|
448
|
|
|
def _on_get_info(self, response, task): |
449
|
|
|
"""Sending runtime infomation about this script.""" |
450
|
|
|
for each in response.save or []: |
451
|
|
|
if each == 'min_tick': |
452
|
|
|
self.save[each] = self._min_tick |
453
|
|
|
elif each == 'retry_delay': |
454
|
|
|
if not isinstance(self.retry_delay, dict): |
455
|
|
|
self.retry_delay = {'': self.retry_delay} |
456
|
|
|
self.save[each] = self.retry_delay |
457
|
|
|
elif each == 'crawl_config': |
458
|
|
|
self.save[each] = self.crawl_config |
459
|
|
|
|