BaseHandler.task_join_crawl_config()   F
last analyzed

Complexity

Conditions 11

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
dl 0
loc 27
rs 3.1764
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like BaseHandler.task_join_crawl_config() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-16 23:12:48
7
8
import sys
9
import inspect
10
import functools
11
import fractions
12
13
import six
14
from six import add_metaclass, iteritems
15
16
from pyspider.libs.url import (
17
    quote_chinese, _build_url, _encode_params,
18
    _encode_multipart_formdata, curl_to_arguments)
19
from pyspider.libs.utils import md5string, timeout
20
from pyspider.libs.ListIO import ListO
21
from pyspider.libs.response import rebuild_response
22
from pyspider.libs.pprint import pprint
23
from pyspider.processor import ProcessorResult
24
25
26
def catch_status_code_error(func):
27
    """
28
    Non-200 response will been regarded as fetch failed and will not pass to callback.
29
    Use this decorator to override this feature.
30
    """
31
    func._catch_status_code_error = True
32
    return func
33
34
35
def not_send_status(func):
36
    """
37
    Do not send process status package back to scheduler.
38
39
    It's used by callbacks like on_message, on_result etc...
40
    """
41
    @functools.wraps(func)
42
    def wrapper(self, response, task):
43
        self._extinfo['not_send_status'] = True
44
        function = func.__get__(self, self.__class__)
45
        return self._run_func(function, response, task)
46
    return wrapper
47
48
49
def config(_config=None, **kwargs):
50
    """
51
    A decorator for setting the default kwargs of `BaseHandler.crawl`.
52
    Any self.crawl with this callback will use this config.
53
    """
54
    if _config is None:
55
        _config = {}
56
    _config.update(kwargs)
57
58
    def wrapper(func):
59
        func._config = _config
60
        return func
61
    return wrapper
62
63
64
class NOTSET(object):
65
    pass
66
67
68
def every(minutes=NOTSET, seconds=NOTSET):
69
    """
70
    method will been called every minutes or seconds
71
    """
72
    def wrapper(func):
73
        # mark the function with variable 'is_cronjob=True', the function would be
74
        # collected into the list Handler._cron_jobs by meta class
75
        func.is_cronjob = True
76
77
        # collect interval and unify to seconds, it's used in meta class. See the
78
        # comments in meta class.
79
        func.tick = minutes * 60 + seconds
80
        return func
81
82
    if inspect.isfunction(minutes):
83
        func = minutes
84
        minutes = 1
85
        seconds = 0
86
        return wrapper(func)
87
88
    if minutes is NOTSET:
89
        if seconds is NOTSET:
90
            minutes = 1
91
            seconds = 0
92
        else:
93
            minutes = 0
94
    if seconds is NOTSET:
95
        seconds = 0
96
97
    return wrapper
98
99
100
class BaseHandlerMeta(type):
101
102
    def __new__(cls, name, bases, attrs):
103
        # A list of all functions which is marked as 'is_cronjob=True'
104
        cron_jobs = []
105
106
        # The min_tick is the greatest common divisor(GCD) of the interval of cronjobs
107
        # this value would be queried by scheduler when the project initial loaded.
108
        # Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce
109
        # the number of tasks sent from scheduler.
110
        min_tick = 0
111
112
        for each in attrs.values():
113
            if inspect.isfunction(each) and getattr(each, 'is_cronjob', False):
114
                cron_jobs.append(each)
115
                min_tick = fractions.gcd(min_tick, each.tick)
116
        newcls = type.__new__(cls, name, bases, attrs)
117
        newcls._cron_jobs = cron_jobs
118
        newcls._min_tick = min_tick
119
        return newcls
120
121
122
@add_metaclass(BaseHandlerMeta)
123
class BaseHandler(object):
124
    """
125
    BaseHandler for all scripts.
126
127
    `BaseHandler.run` is the main method to handler the task.
128
    """
129
    crawl_config = {}
130
    project_name = None
131
    _cron_jobs = []
132
    _min_tick = 0
133
    __env__ = {'not_inited': True}
134
    retry_delay = {}
135
136
    def _reset(self):
137
        """
138
        reset before each task
139
        """
140
        self._extinfo = {}
141
        self._messages = []
142
        self._follows = []
143
        self._follows_keys = set()
144
145
    def _run_func(self, function, *arguments):
146
        """
147
        Running callback function with requested number of arguments
148
        """
149
        args, varargs, keywords, defaults = inspect.getargspec(function)
150
        task = arguments[-1]
151
        process_time_limit = task['process'].get('process_time_limit',
152
                                                 self.__env__.get('process_time_limit', 0))
153
        if process_time_limit > 0:
154
            with timeout(process_time_limit, 'process timeout'):
155
                ret = function(*arguments[:len(args) - 1])
156
        else:
157
            ret = function(*arguments[:len(args) - 1])
158
        return ret
159
160
    def _run_task(self, task, response):
161
        """
162
        Finding callback specified by `task['callback']`
163
        raising status error for it if needed.
164
        """
165
        process = task.get('process', {})
166
        callback = process.get('callback', '__call__')
167
        if not hasattr(self, callback):
168
            raise NotImplementedError("self.%s() not implemented!" % callback)
169
170
        function = getattr(self, callback)
171
        # do not run_func when 304
172
        if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False):
173
            return None
174
        if not getattr(function, '_catch_status_code_error', False):
175
            response.raise_for_status()
176
        return self._run_func(function, response, task)
177
178
    def run_task(self, module, task, response):
179
        """
180
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
181
        """
182
        self.logger = logger = module.logger
183
        result = None
184
        exception = None
185
        stdout = sys.stdout
186
        self.task = task
187
        if isinstance(response, dict):
188
            response = rebuild_response(response)
189
        self.response = response
190
        self.save = (task.get('track') or {}).get('save', {})
191
192
        try:
193
            if self.__env__.get('enable_stdout_capture', True):
194
                sys.stdout = ListO(module.log_buffer)
195
            self._reset()
196
            result = self._run_task(task, response)
197
            if inspect.isgenerator(result):
198
                for r in result:
199
                    self._run_func(self.on_result, r, response, task)
200
            else:
201
                self._run_func(self.on_result, result, response, task)
202
        except Exception as e:
203
            logger.exception(e)
204
            exception = e
205
        finally:
206
            follows = self._follows
207
            messages = self._messages
208
            logs = list(module.log_buffer)
209
            extinfo = self._extinfo
210
            save = self.save
211
212
            sys.stdout = stdout
213
            self.task = None
214
            self.response = None
215
            self.save = None
216
217
        module.log_buffer[:] = []
218
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
219
220
    schedule_fields = ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update', 'auto_recrawl', 'cancel')
221
    fetch_fields = ('method', 'headers', 'user_agent', 'data', 'connect_timeout', 'timeout', 'allow_redirects', 'cookies',
222
                    'proxy', 'etag', 'last_modifed', 'last_modified', 'save', 'js_run_at', 'js_script',
223
                    'js_viewport_width', 'js_viewport_height', 'load_images', 'fetch_type', 'use_gzip', 'validate_cert',
224
                    'max_redirects', 'robots_txt')
225
    process_fields = ('callback', 'process_time_limit')
226
227
    @staticmethod
228
    def task_join_crawl_config(task, crawl_config):
229
        task_fetch = task.get('fetch', {})
230
        for k in BaseHandler.fetch_fields:
231
            if k in crawl_config:
232
                v = crawl_config[k]
233
                if isinstance(v, dict) and isinstance(task_fetch.get(k), dict):
234
                    v = dict(v)
235
                    v.update(task_fetch[k])
236
                    task_fetch[k] = v
237
                else:
238
                    task_fetch.setdefault(k, v)
239
        if task_fetch:
240
            task['fetch'] = task_fetch
241
242
        task_process = task.get('process', {})
243
        for k in BaseHandler.process_fields:
244
            if k in crawl_config:
245
                v = crawl_config[k]
246
                if isinstance(v, dict) and isinstance(task_process.get(k), dict):
247
                    task_process[k].update(v)
248
                else:
249
                    task_process.setdefault(k, v)
250
        if task_process:
251
            task['process'] = task_process
252
253
        return task
254
255
    def _crawl(self, url, **kwargs):
256
        """
257
        real crawl API
258
259
        checking kwargs, and repack them to each sub-dict
260
        """
261
        task = {}
262
263
        assert len(url) < 1024, "Maximum (1024) URL length error."
264
265
        if kwargs.get('callback'):
266
            callback = kwargs['callback']
267
            if isinstance(callback, six.string_types) and hasattr(self, callback):
268
                func = getattr(self, callback)
269
            elif six.callable(callback) and six.get_method_self(callback) is self:
270
                func = callback
271
                kwargs['callback'] = func.__name__
272
            else:
273
                raise NotImplementedError("self.%s() not implemented!" % callback)
274
            if hasattr(func, '_config'):
275
                for k, v in iteritems(func._config):
276
                    if isinstance(v, dict) and isinstance(kwargs.get(k), dict):
277
                        kwargs[k].update(v)
278
                    else:
279
                        kwargs.setdefault(k, v)
280
281
        url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None)))
282
        if kwargs.get('files'):
283
            assert isinstance(
284
                kwargs.get('data', {}), dict), "data must be a dict when using with files!"
285
            content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}),
286
                                                            kwargs.pop('files', {}))
287
            kwargs.setdefault('headers', {})
288
            kwargs['headers']['Content-Type'] = content_type
289
            kwargs['data'] = data
290
        if kwargs.get('data'):
291
            kwargs['data'] = _encode_params(kwargs['data'])
292
        if kwargs.get('data'):
293
            kwargs.setdefault('method', 'POST')
294
295
        if kwargs.get('user_agent'):
296
            kwargs.setdefault('headers', {})
297
            kwargs['headers']['User-Agent'] = kwargs.get('user_agent')
298
299
        schedule = {}
300
        for key in self.schedule_fields:
301
            if key in kwargs:
302
                schedule[key] = kwargs.pop(key)
303
            elif key in self.crawl_config:
304
                schedule[key] = self.crawl_config[key]
305
306
        task['schedule'] = schedule
307
308
        fetch = {}
309
        for key in self.fetch_fields:
310
            if key in kwargs:
311
                fetch[key] = kwargs.pop(key)
312
        task['fetch'] = fetch
313
314
        process = {}
315
        for key in self.process_fields:
316
            if key in kwargs:
317
                process[key] = kwargs.pop(key)
318
        task['process'] = process
319
320
        task['project'] = self.project_name
321
        task['url'] = url
322
        if 'taskid' in kwargs:
323
            task['taskid'] = kwargs.pop('taskid')
324
        else:
325
            task['taskid'] = self.get_taskid(task)
326
327
        if kwargs:
328
            raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys())
329
330
        if self.is_debugger():
331
            task = self.task_join_crawl_config(task, self.crawl_config)
332
333
        cache_key = "%(project)s:%(taskid)s" % task
334
        if cache_key not in self._follows_keys:
335
            self._follows_keys.add(cache_key)
336
            self._follows.append(task)
337
        return task
338
339
    def get_taskid(self, task):
340
        '''Generate taskid by information of task md5(url) by default, override me'''
341
        return md5string(task['url'])
342
343
    # apis
344
    def crawl(self, url, **kwargs):
345
        '''
346
        available params:
347
          url
348
          callback
349
350
          method
351
          params
352
          data
353
          files
354
          headers
355
          timeout
356
          allow_redirects
357
          cookies
358
          proxy
359
          etag
360
          last_modified
361
          auto_recrawl
362
363
          fetch_type
364
          js_run_at
365
          js_script
366
          js_viewport_width
367
          js_viewport_height
368
          load_images
369
370
          priority
371
          retries
372
          exetime
373
          age
374
          itag
375
          cancel
376
377
          save
378
          taskid
379
380
          full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/
381
        '''
382
383
        if isinstance(url, six.string_types) and url.startswith('curl '):
384
            curl_kwargs = curl_to_arguments(url)
385
            url = curl_kwargs.pop('urls')
386
            for k, v in iteritems(curl_kwargs):
387
                kwargs.setdefault(k, v)
388
389
        if isinstance(url, six.string_types):
390
            return self._crawl(url, **kwargs)
391
        elif hasattr(url, "__iter__"):
392
            result = []
393
            for each in url:
394
                result.append(self._crawl(each, **kwargs))
395
            return result
396
397
    def is_debugger(self):
398
        """Return true if running in debugger"""
399
        return self.__env__.get('debugger')
400
401
    def send_message(self, project, msg, url='data:,on_message'):
402
        """Send messages to other project."""
403
        self._messages.append((project, msg, url))
404
405
    def on_message(self, project, msg):
406
        """Receive message from other project, override me."""
407
        pass
408
409
    def on_result(self, result):
410
        """Receiving returns from other callback, override me."""
411
        if not result:
412
            return
413
        assert self.task, "on_result can't outside a callback."
414
        if self.is_debugger():
415
            pprint(result)
416
        if self.__env__.get('result_queue'):
417
            self.__env__['result_queue'].put((self.task, result))
418
419
    def on_finished(self, response, task):
420
        """
421
        Triggered when all tasks in task queue finished.
422
        http://docs.pyspider.org/en/latest/About-Projects/#on_finished-callback
423
        """
424
        pass
425
426
    @not_send_status
427
    def _on_message(self, response):
428
        project, msg = response.save
429
        return self.on_message(project, msg)
430
431
    @not_send_status
432
    def _on_cronjob(self, response, task):
433
        if (not response.save
434
                or not isinstance(response.save, dict)
435
                or 'tick' not in response.save):
436
            return
437
438
        # When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in
439
        # Response.save. Scheduler may at least send the trigger task every GCD of the
440
        # inverval of the cronjobs. The method should check the tick for each cronjob
441
        # function to confirm the execute interval.
442
        for cronjob in self._cron_jobs:
443
            if response.save['tick'] % cronjob.tick != 0:
444
                continue
445
            function = cronjob.__get__(self, self.__class__)
446
            self._run_func(function, response, task)
447
448
    def _on_get_info(self, response, task):
449
        """Sending runtime infomation about this script."""
450
        for each in response.save or []:
451
            if each == 'min_tick':
452
                self.save[each] = self._min_tick
453
            elif each == 'retry_delay':
454
                if not isinstance(self.retry_delay, dict):
455
                    self.retry_delay = {'': self.retry_delay}
456
                self.save[each] = self.retry_delay
457
            elif each == 'crawl_config':
458
                self.save[each] = self.crawl_config
459