Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

BaseHandler.send_message()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-16 23:12:48
7
8
import sys
9
import inspect
10
import functools
11
import fractions
12
13
import six
14
from six import add_metaclass, iteritems
15
16
from pyspider.libs.url import (
17
    quote_chinese, _build_url, _encode_params,
18
    _encode_multipart_formdata, curl_to_arguments)
19
from pyspider.libs.utils import md5string
20
from pyspider.libs.ListIO import ListO
21
from pyspider.libs.response import rebuild_response
22
from pyspider.libs.pprint import pprint
23
from pyspider.processor import ProcessorResult
24
25
26
def catch_status_code_error(func):
27
    """
28
    Non-200 response will been regarded as fetch failed and will not pass to callback.
29
    Use this decorator to override this feature.
30
    """
31
    func._catch_status_code_error = True
32
    return func
33
34
35
def not_send_status(func):
36
    """
37
    Do not send process status package back to scheduler.
38
39
    It's used by callbacks like on_message, on_result etc...
40
    """
41
    @functools.wraps(func)
42
    def wrapper(self, response, task):
43
        self._extinfo['not_send_status'] = True
44
        function = func.__get__(self, self.__class__)
45
        return self._run_func(function, response, task)
46
    return wrapper
47
48
49
def config(_config=None, **kwargs):
50
    """
51
    A decorator for setting the default kwargs of `BaseHandler.crawl`.
52
    Any self.crawl with this callback will use this config.
53
    """
54
    if _config is None:
55
        _config = {}
56
    _config.update(kwargs)
57
58
    def wrapper(func):
59
        func._config = _config
60
        return func
61
    return wrapper
62
63
64
class NOTSET(object):
65
    pass
66
67
68
def every(minutes=NOTSET, seconds=NOTSET):
69
    """
70
    method will been called every minutes or seconds
71
    """
72
    def wrapper(func):
73
        # mark the function with variable 'is_cronjob=True', the function would be
74
        # collected into the list Handler._cron_jobs by meta class
75
        func.is_cronjob = True
76
77
        # collect interval and unify to seconds, it's used in meta class. See the
78
        # comments in meta class.
79
        func.tick = minutes * 60 + seconds
80
        return func
81
82
    if inspect.isfunction(minutes):
83
        func = minutes
84
        minutes = 1
85
        seconds = 0
86
        return wrapper(func)
87
88
    if minutes is NOTSET:
89
        if seconds is NOTSET:
90
            minutes = 1
91
            seconds = 0
92
        else:
93
            minutes = 0
94
    if seconds is NOTSET:
95
        seconds = 0
96
97
    return wrapper
98
99
100
class BaseHandlerMeta(type):
101
102
    def __new__(cls, name, bases, attrs):
103
        # A list of all functions which is marked as 'is_cronjob=True'
104
        cron_jobs = []
105
106
        # The min_tick is the greatest common divisor(GCD) of the interval of cronjobs
107
        # this value would be queried by scheduler when the project initial loaded.
108
        # Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce
109
        # the number of tasks sent from scheduler.
110
        min_tick = 0
111
112
        for each in attrs.values():
113
            if inspect.isfunction(each) and getattr(each, 'is_cronjob', False):
114
                cron_jobs.append(each)
115
                min_tick = fractions.gcd(min_tick, each.tick)
116
        newcls = type.__new__(cls, name, bases, attrs)
117
        newcls._cron_jobs = cron_jobs
118
        newcls._min_tick = min_tick
119
        return newcls
120
121
122
@add_metaclass(BaseHandlerMeta)
123
class BaseHandler(object):
124
    """
125
    BaseHandler for all scripts.
126
127
    `BaseHandler.run` is the main method to handler the task.
128
    """
129
    crawl_config = {}
130
    project_name = None
131
    _cron_jobs = []
132
    _min_tick = 0
133
    __env__ = {'not_inited': True}
134
    retry_delay = {}
135
136
    def _reset(self):
137
        """
138
        reset before each task
139
        """
140
        self._extinfo = {}
141
        self._messages = []
142
        self._follows = []
143
        self._follows_keys = set()
144
145
    def _run_func(self, function, *arguments):
146
        """
147
        Running callback function with requested number of arguments
148
        """
149
        args, varargs, keywords, defaults = inspect.getargspec(function)
150
        return function(*arguments[:len(args) - 1])
151
152
    def _run_task(self, task, response):
153
        """
154
        Finding callback specified by `task['callback']`
155
        raising status error for it if needed.
156
        """
157
        process = task.get('process', {})
158
        callback = process.get('callback', '__call__')
159
        if not hasattr(self, callback):
160
            raise NotImplementedError("self.%s() not implemented!" % callback)
161
162
        function = getattr(self, callback)
163
        # do not run_func when 304
164
        if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False):
165
            return None
166
        if not getattr(function, '_catch_status_code_error', False):
167
            response.raise_for_status()
168
        return self._run_func(function, response, task)
169
170
    def run_task(self, module, task, response):
171
        """
172
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
173
        """
174
        logger = module.logger
175
        result = None
176
        exception = None
177
        stdout = sys.stdout
178
        self.task = task
179
        if isinstance(response, dict):
180
            response = rebuild_response(response)
181
        self.response = response
182
        self.save = (task.get('track') or {}).get('save', {})
183
184
        try:
185
            if self.__env__.get('enable_stdout_capture', True):
186
                sys.stdout = ListO(module.log_buffer)
187
            self._reset()
188
            result = self._run_task(task, response)
189
            if inspect.isgenerator(result):
190
                for r in result:
191
                    self._run_func(self.on_result, r, response, task)
192
            else:
193
                self._run_func(self.on_result, result, response, task)
194
        except Exception as e:
195
            logger.exception(e)
196
            exception = e
197
        finally:
198
            follows = self._follows
199
            messages = self._messages
200
            logs = list(module.log_buffer)
201
            extinfo = self._extinfo
202
            save = self.save
203
204
            sys.stdout = stdout
205
            self.task = None
206
            self.response = None
207
            self.save = None
208
209
        module.log_buffer[:] = []
210
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
211
212
    def _crawl(self, url, **kwargs):
213
        """
214
        real crawl API
215
216
        checking kwargs, and repack them to each sub-dict
217
        """
218
        task = {}
219
220
        assert len(url) < 1024, "Maximum (1024) URL length error."
221
222
        if kwargs.get('callback'):
223
            callback = kwargs['callback']
224
            if isinstance(callback, six.string_types) and hasattr(self, callback):
225
                func = getattr(self, callback)
226
            elif six.callable(callback) and six.get_method_self(callback) is self:
227
                func = callback
228
                kwargs['callback'] = func.__name__
229
            else:
230
                raise NotImplementedError("self.%s() not implemented!" % callback)
231
            if hasattr(func, '_config'):
232
                for k, v in iteritems(func._config):
233
                    if isinstance(v, dict) and isinstance(kwargs.get(k), dict):
234
                        kwargs[k].update(v)
235
                    else:
236
                        kwargs.setdefault(k, v)
237
238
        for k, v in iteritems(self.crawl_config):
239
            if isinstance(v, dict) and isinstance(kwargs.get(k), dict):
240
                kwargs[k].update(v)
241
            else:
242
                kwargs.setdefault(k, v)
243
244
        url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None)))
245
        if kwargs.get('files'):
246
            assert isinstance(
247
                kwargs.get('data', {}), dict), "data must be a dict when using with files!"
248
            content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}),
249
                                                            kwargs.pop('files', {}))
250
            kwargs.setdefault('headers', {})
251
            kwargs['headers']['Content-Type'] = content_type
252
            kwargs['data'] = data
253
        if kwargs.get('data'):
254
            kwargs['data'] = _encode_params(kwargs['data'])
255
        if kwargs.get('data'):
256
            kwargs.setdefault('method', 'POST')
257
258
        schedule = {}
259
        for key in ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update',
260
                    'auto_recrawl'):
261
            if key in kwargs:
262
                schedule[key] = kwargs.pop(key)
263
        task['schedule'] = schedule
264
265
        fetch = {}
266
        for key in (
267
                'method',
268
                'headers',
269
                'data',
270
                'timeout',
271
                'allow_redirects',
272
                'cookies',
273
                'proxy',
274
                'etag',
275
                'last_modifed',
276
                'last_modified',
277
                'save',
278
                'js_run_at',
279
                'js_script',
280
                'js_viewport_width',
281
                'js_viewport_height',
282
                'load_images',
283
                'fetch_type',
284
                'use_gzip',
285
                'validate_cert',
286
                'max_redirects',
287
                'robots_txt'
288
        ):
289
            if key in kwargs:
290
                fetch[key] = kwargs.pop(key)
291
        task['fetch'] = fetch
292
293
        process = {}
294
        for key in ('callback', ):
295
            if key in kwargs:
296
                process[key] = kwargs.pop(key)
297
        task['process'] = process
298
299
        task['project'] = self.project_name
300
        task['url'] = url
301
        if 'taskid' in kwargs:
302
            task['taskid'] = kwargs.pop('taskid')
303
        else:
304
            task['taskid'] = self.get_taskid(task)
305
306
        if kwargs:
307
            raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys())
308
309
        cache_key = "%(project)s:%(taskid)s" % task
310
        if cache_key not in self._follows_keys:
311
            self._follows_keys.add(cache_key)
312
            self._follows.append(task)
313
        return task
314
315
    def get_taskid(self, task):
316
        '''Generate taskid by information of task md5(url) by default, override me'''
317
        return md5string(task['url'])
318
319
    # apis
320
    def crawl(self, url, **kwargs):
321
        '''
322
        available params:
323
          url
324
          callback
325
326
          method
327
          params
328
          data
329
          files
330
          headers
331
          timeout
332
          allow_redirects
333
          cookies
334
          proxy
335
          etag
336
          last_modified
337
          auto_recrawl
338
339
          fetch_type
340
          js_run_at
341
          js_script
342
          js_viewport_width
343
          js_viewport_height
344
          load_images
345
346
          priority
347
          retries
348
          exetime
349
          age
350
          itag
351
352
          save
353
          taskid
354
355
          full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/
356
        '''
357
358
        if isinstance(url, six.string_types) and url.startswith('curl '):
359
            curl_kwargs = curl_to_arguments(url)
360
            url = curl_kwargs.pop('urls')
361
            for k, v in iteritems(curl_kwargs):
362
                kwargs.setdefault(k, v)
363
364
        if isinstance(url, six.string_types):
365
            return self._crawl(url, **kwargs)
366
        elif hasattr(url, "__iter__"):
367
            result = []
368
            for each in url:
369
                result.append(self._crawl(each, **kwargs))
370
            return result
371
372
    def is_debugger(self):
373
        """Return true if running in debugger"""
374
        return self.__env__.get('debugger')
375
376
    def send_message(self, project, msg, url='data:,on_message'):
377
        """Send messages to other project."""
378
        self._messages.append((project, msg, url))
379
380
    def on_message(self, project, msg):
381
        """Receive message from other project, override me."""
382
        pass
383
384
    def on_result(self, result):
385
        """Receiving returns from other callback, override me."""
386
        if not result:
387
            return
388
        assert self.task, "on_result can't outside a callback."
389
        if self.is_debugger():
390
            pprint(result)
391
        if self.__env__.get('result_queue'):
392
            self.__env__['result_queue'].put((self.task, result))
393
394
    @not_send_status
395
    def _on_message(self, response):
396
        project, msg = response.save
397
        return self.on_message(project, msg)
398
399
    @not_send_status
400
    def _on_cronjob(self, response, task):
401
        if (not response.save
402
                or not isinstance(response.save, dict)
403
                or 'tick' not in response.save):
404
            return
405
406
        # When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in
407
        # Response.save. Scheduler may at least send the trigger task every GCD of the
408
        # inverval of the cronjobs. The method should check the tick for each cronjob
409
        # function to confirm the execute interval.
410
        for cronjob in self._cron_jobs:
411
            if response.save['tick'] % cronjob.tick != 0:
412
                continue
413
            function = cronjob.__get__(self, self.__class__)
414
            self._run_func(function, response, task)
415
416
    def _on_get_info(self, response, task):
417
        """Sending runtime infomation about this script."""
418
        for each in response.save or []:
419
            if each == 'min_tick':
420
                self.save[each] = self._min_tick
421
            elif each == 'retry_delay':
422
                if not isinstance(self.retry_delay, dict):
423
                    self.retry_delay = {'': self.retry_delay}
424
                self.save[each] = self.retry_delay
425
426
    @not_send_status
427
    def on_finished(self, response, task):
428
        pass
429