Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

BaseHandler._crawl()   F

Complexity

Conditions 25

Size

Total Lines 76

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 25
c 3
b 1
f 0
dl 0
loc 76
rs 2.4159

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like BaseHandler._crawl() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-16 23:12:48
7
8
import sys
9
import inspect
10
import functools
11
import fractions
12
13
import six
14
from six import add_metaclass, iteritems
15
16
from pyspider.libs.url import (
17
    quote_chinese, _build_url, _encode_params,
18
    _encode_multipart_formdata, curl_to_arguments)
19
from pyspider.libs.utils import md5string
20
from pyspider.libs.ListIO import ListO
21
from pyspider.libs.response import rebuild_response
22
from pyspider.libs.pprint import pprint
23
from pyspider.processor import ProcessorResult
24
25
26
def catch_status_code_error(func):
27
    """
28
    Non-200 response will been regarded as fetch failed and will not pass to callback.
29
    Use this decorator to override this feature.
30
    """
31
    func._catch_status_code_error = True
32
    return func
33
34
35
def not_send_status(func):
36
    """
37
    Do not send process status package back to scheduler.
38
39
    It's used by callbacks like on_message, on_result etc...
40
    """
41
    @functools.wraps(func)
42
    def wrapper(self, response, task):
43
        self._extinfo['not_send_status'] = True
44
        function = func.__get__(self, self.__class__)
45
        return self._run_func(function, response, task)
46
    return wrapper
47
48
49
def config(_config=None, **kwargs):
50
    """
51
    A decorator for setting the default kwargs of `BaseHandler.crawl`.
52
    Any self.crawl with this callback will use this config.
53
    """
54
    if _config is None:
55
        _config = {}
56
    _config.update(kwargs)
57
58
    def wrapper(func):
59
        func._config = _config
60
        return func
61
    return wrapper
62
63
64
class NOTSET(object):
65
    pass
66
67
68
def every(minutes=NOTSET, seconds=NOTSET):
69
    """
70
    method will been called every minutes or seconds
71
    """
72
    def wrapper(func):
73
        # mark the function with variable 'is_cronjob=True', the function would be
74
        # collected into the list Handler._cron_jobs by meta class
75
        func.is_cronjob = True
76
77
        # collect interval and unify to seconds, it's used in meta class. See the
78
        # comments in meta class.
79
        func.tick = minutes * 60 + seconds
80
        return func
81
82
    if inspect.isfunction(minutes):
83
        func = minutes
84
        minutes = 1
85
        seconds = 0
86
        return wrapper(func)
87
88
    if minutes is NOTSET:
89
        if seconds is NOTSET:
90
            minutes = 1
91
            seconds = 0
92
        else:
93
            minutes = 0
94
    if seconds is NOTSET:
95
        seconds = 0
96
97
    return wrapper
98
99
100
class BaseHandlerMeta(type):
101
102
    def __new__(cls, name, bases, attrs):
103
        # A list of all functions which is marked as 'is_cronjob=True'
104
        cron_jobs = []
105
106
        # The min_tick is the greatest common divisor(GCD) of the interval of cronjobs
107
        # this value would be queried by scheduler when the project initial loaded.
108
        # Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce
109
        # the number of tasks sent from scheduler.
110
        min_tick = 0
111
112
        for each in attrs.values():
113
            if inspect.isfunction(each) and getattr(each, 'is_cronjob', False):
114
                cron_jobs.append(each)
115
                min_tick = fractions.gcd(min_tick, each.tick)
116
        newcls = type.__new__(cls, name, bases, attrs)
117
        newcls._cron_jobs = cron_jobs
118
        newcls._min_tick = min_tick
119
        return newcls
120
121
122
@add_metaclass(BaseHandlerMeta)
123
class BaseHandler(object):
124
    """
125
    BaseHandler for all scripts.
126
127
    `BaseHandler.run` is the main method to handler the task.
128
    """
129
    crawl_config = {}
130
    project_name = None
131
    _cron_jobs = []
132
    _min_tick = 0
133
    __env__ = {'not_inited': True}
134
    retry_delay = {}
135
136
    def _reset(self):
137
        """
138
        reset before each task
139
        """
140
        self._extinfo = {}
141
        self._messages = []
142
        self._follows = []
143
        self._follows_keys = set()
144
145
    def _run_func(self, function, *arguments):
146
        """
147
        Running callback function with requested number of arguments
148
        """
149
        args, varargs, keywords, defaults = inspect.getargspec(function)
150
        return function(*arguments[:len(args) - 1])
151
152
    def _run_task(self, task, response):
153
        """
154
        Finding callback specified by `task['callback']`
155
        raising status error for it if needed.
156
        """
157
        process = task.get('process', {})
158
        callback = process.get('callback', '__call__')
159
        if not hasattr(self, callback):
160
            raise NotImplementedError("self.%s() not implemented!" % callback)
161
162
        function = getattr(self, callback)
163
        # do not run_func when 304
164
        if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False):
165
            return None
166
        if not getattr(function, '_catch_status_code_error', False):
167
            response.raise_for_status()
168
        return self._run_func(function, response, task)
169
170
    def run_task(self, module, task, response):
171
        """
172
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
173
        """
174
        logger = module.logger
175
        result = None
176
        exception = None
177
        stdout = sys.stdout
178
        self.task = task
179
        if isinstance(response, dict):
180
            response = rebuild_response(response)
181
        self.response = response
182
        self.save = (task.get('track') or {}).get('save', {})
183
184
        try:
185
            if self.__env__.get('enable_stdout_capture', True):
186
                sys.stdout = ListO(module.log_buffer)
187
            self._reset()
188
            result = self._run_task(task, response)
189
            if inspect.isgenerator(result):
190
                for r in result:
191
                    self._run_func(self.on_result, r, response, task)
192
            else:
193
                self._run_func(self.on_result, result, response, task)
194
        except Exception as e:
195
            logger.exception(e)
196
            exception = e
197
        finally:
198
            follows = self._follows
199
            messages = self._messages
200
            logs = list(module.log_buffer)
201
            extinfo = self._extinfo
202
            save = self.save
203
204
            sys.stdout = stdout
205
            self.task = None
206
            self.response = None
207
            self.save = None
208
209
        module.log_buffer[:] = []
210
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
211
212
    schedule_fields = ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update', 'auto_recrawl', 'cancel')
213
    fetch_fields = ('method', 'headers', 'data', 'connect_timeout', 'timeout', 'allow_redirects', 'cookies',
214
                    'proxy', 'etag', 'last_modifed', 'last_modified', 'save', 'js_run_at', 'js_script',
215
                    'js_viewport_width', 'js_viewport_height', 'load_images', 'fetch_type', 'use_gzip', 'validate_cert',
216
                    'max_redirects', 'robots_txt')
217
    process_fields = ('callback', )
218
219
    @staticmethod
220
    def task_join_crawl_config(task, crawl_config):
221
        task_fetch = task.get('fetch', {})
222
        for k in BaseHandler.fetch_fields:
223
            if k in crawl_config:
224
                v = crawl_config[k]
225
                if isinstance(v, dict) and isinstance(task_fetch.get(k), dict):
226
                    task_fetch[k].update(v)
227
                else:
228
                    task_fetch.setdefault(k, v)
229
        if task_fetch:
230
            task['fetch'] = task_fetch
231
232
        task_process = task.get('process', {})
233
        for k in BaseHandler.process_fields:
234
            if k in crawl_config:
235
                v = crawl_config[k]
236
                if isinstance(v, dict) and isinstance(task_process.get(k), dict):
237
                    task_process[k].update(v)
238
                else:
239
                    task_process.setdefault(k, v)
240
        if task_process:
241
            task['process'] = task_process
242
243
        return task
244
245
    def _crawl(self, url, **kwargs):
246
        """
247
        real crawl API
248
249
        checking kwargs, and repack them to each sub-dict
250
        """
251
        task = {}
252
253
        assert len(url) < 1024, "Maximum (1024) URL length error."
254
255
        if kwargs.get('callback'):
256
            callback = kwargs['callback']
257
            if isinstance(callback, six.string_types) and hasattr(self, callback):
258
                func = getattr(self, callback)
259
            elif six.callable(callback) and six.get_method_self(callback) is self:
260
                func = callback
261
                kwargs['callback'] = func.__name__
262
            else:
263
                raise NotImplementedError("self.%s() not implemented!" % callback)
264
            if hasattr(func, '_config'):
265
                for k, v in iteritems(func._config):
266
                    if isinstance(v, dict) and isinstance(kwargs.get(k), dict):
267
                        kwargs[k].update(v)
268
                    else:
269
                        kwargs.setdefault(k, v)
270
271
        url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None)))
272
        if kwargs.get('files'):
273
            assert isinstance(
274
                kwargs.get('data', {}), dict), "data must be a dict when using with files!"
275
            content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}),
276
                                                            kwargs.pop('files', {}))
277
            kwargs.setdefault('headers', {})
278
            kwargs['headers']['Content-Type'] = content_type
279
            kwargs['data'] = data
280
        if kwargs.get('data'):
281
            kwargs['data'] = _encode_params(kwargs['data'])
282
        if kwargs.get('data'):
283
            kwargs.setdefault('method', 'POST')
284
285
        schedule = {}
286
        for key in self.schedule_fields:
287
            if key in kwargs:
288
                schedule[key] = kwargs.pop(key)
289
            elif key in self.crawl_config:
290
                schedule[key] = self.crawl_config[key]
291
292
        task['schedule'] = schedule
293
294
        fetch = {}
295
        for key in self.fetch_fields:
296
            if key in kwargs:
297
                fetch[key] = kwargs.pop(key)
298
        task['fetch'] = fetch
299
300
        process = {}
301
        for key in self.process_fields:
302
            if key in kwargs:
303
                process[key] = kwargs.pop(key)
304
        task['process'] = process
305
306
        task['project'] = self.project_name
307
        task['url'] = url
308
        if 'taskid' in kwargs:
309
            task['taskid'] = kwargs.pop('taskid')
310
        else:
311
            task['taskid'] = self.get_taskid(task)
312
313
        if kwargs:
314
            raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys())
315
316
        cache_key = "%(project)s:%(taskid)s" % task
317
        if cache_key not in self._follows_keys:
318
            self._follows_keys.add(cache_key)
319
            self._follows.append(task)
320
        return task
321
322
    def get_taskid(self, task):
323
        '''Generate taskid by information of task md5(url) by default, override me'''
324
        return md5string(task['url'])
325
326
    # apis
327
    def crawl(self, url, **kwargs):
328
        '''
329
        available params:
330
          url
331
          callback
332
333
          method
334
          params
335
          data
336
          files
337
          headers
338
          timeout
339
          allow_redirects
340
          cookies
341
          proxy
342
          etag
343
          last_modified
344
          auto_recrawl
345
346
          fetch_type
347
          js_run_at
348
          js_script
349
          js_viewport_width
350
          js_viewport_height
351
          load_images
352
353
          priority
354
          retries
355
          exetime
356
          age
357
          itag
358
          cancel
359
360
          save
361
          taskid
362
363
          full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/
364
        '''
365
366
        if isinstance(url, six.string_types) and url.startswith('curl '):
367
            curl_kwargs = curl_to_arguments(url)
368
            url = curl_kwargs.pop('urls')
369
            for k, v in iteritems(curl_kwargs):
370
                kwargs.setdefault(k, v)
371
372
        if isinstance(url, six.string_types):
373
            return self._crawl(url, **kwargs)
374
        elif hasattr(url, "__iter__"):
375
            result = []
376
            for each in url:
377
                result.append(self._crawl(each, **kwargs))
378
            return result
379
380
    def is_debugger(self):
381
        """Return true if running in debugger"""
382
        return self.__env__.get('debugger')
383
384
    def send_message(self, project, msg, url='data:,on_message'):
385
        """Send messages to other project."""
386
        self._messages.append((project, msg, url))
387
388
    def on_message(self, project, msg):
389
        """Receive message from other project, override me."""
390
        pass
391
392
    def on_result(self, result):
393
        """Receiving returns from other callback, override me."""
394
        if not result:
395
            return
396
        assert self.task, "on_result can't outside a callback."
397
        if self.is_debugger():
398
            pprint(result)
399
        if self.__env__.get('result_queue'):
400
            self.__env__['result_queue'].put((self.task, result))
401
402
    @not_send_status
403
    def _on_message(self, response):
404
        project, msg = response.save
405
        return self.on_message(project, msg)
406
407
    @not_send_status
408
    def _on_cronjob(self, response, task):
409
        if (not response.save
410
                or not isinstance(response.save, dict)
411
                or 'tick' not in response.save):
412
            return
413
414
        # When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in
415
        # Response.save. Scheduler may at least send the trigger task every GCD of the
416
        # inverval of the cronjobs. The method should check the tick for each cronjob
417
        # function to confirm the execute interval.
418
        for cronjob in self._cron_jobs:
419
            if response.save['tick'] % cronjob.tick != 0:
420
                continue
421
            function = cronjob.__get__(self, self.__class__)
422
            self._run_func(function, response, task)
423
424
    def _on_get_info(self, response, task):
425
        """Sending runtime infomation about this script."""
426
        for each in response.save or []:
427
            if each == 'min_tick':
428
                self.save[each] = self._min_tick
429
            elif each == 'retry_delay':
430
                if not isinstance(self.retry_delay, dict):
431
                    self.retry_delay = {'': self.retry_delay}
432
                self.save[each] = self.retry_delay
433
            elif each == 'crawl_config':
434
                self.save[each] = self.crawl_config
435
436
    @not_send_status
437
    def on_finished(self, response, task):
438
        pass
439