Total Complexity | 69 |
Total Lines | 306 |
Duplicated Lines | 0 % |
Complex classes like pyspider.libs.BaseHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
122 | @add_metaclass(BaseHandlerMeta) |
||
123 | class BaseHandler(object): |
||
124 | """ |
||
125 | BaseHandler for all scripts. |
||
126 | |||
127 | `BaseHandler.run` is the main method to handler the task. |
||
128 | """ |
||
129 | crawl_config = {} |
||
130 | project_name = None |
||
131 | _cron_jobs = [] |
||
132 | _min_tick = 0 |
||
133 | __env__ = {'not_inited': True} |
||
134 | retry_delay = {} |
||
135 | |||
136 | def _reset(self): |
||
137 | """ |
||
138 | reset before each task |
||
139 | """ |
||
140 | self._extinfo = {} |
||
141 | self._messages = [] |
||
142 | self._follows = [] |
||
143 | self._follows_keys = set() |
||
144 | |||
145 | def _run_func(self, function, *arguments): |
||
146 | """ |
||
147 | Running callback function with requested number of arguments |
||
148 | """ |
||
149 | args, varargs, keywords, defaults = inspect.getargspec(function) |
||
150 | return function(*arguments[:len(args) - 1]) |
||
151 | |||
152 | def _run_task(self, task, response): |
||
153 | """ |
||
154 | Finding callback specified by `task['callback']` |
||
155 | raising status error for it if needed. |
||
156 | """ |
||
157 | process = task.get('process', {}) |
||
158 | callback = process.get('callback', '__call__') |
||
159 | if not hasattr(self, callback): |
||
160 | raise NotImplementedError("self.%s() not implemented!" % callback) |
||
161 | |||
162 | function = getattr(self, callback) |
||
163 | # do not run_func when 304 |
||
164 | if response.status_code == 304 and not getattr(function, '_catch_status_code_error', False): |
||
165 | return None |
||
166 | if not getattr(function, '_catch_status_code_error', False): |
||
167 | response.raise_for_status() |
||
168 | return self._run_func(function, response, task) |
||
169 | |||
170 | def run_task(self, module, task, response): |
||
171 | """ |
||
172 | Processing the task, catching exceptions and logs, return a `ProcessorResult` object |
||
173 | """ |
||
174 | logger = module.logger |
||
175 | result = None |
||
176 | exception = None |
||
177 | stdout = sys.stdout |
||
178 | self.task = task |
||
179 | if isinstance(response, dict): |
||
180 | response = rebuild_response(response) |
||
181 | self.response = response |
||
182 | self.save = (task.get('track') or {}).get('save', {}) |
||
183 | |||
184 | try: |
||
185 | if self.__env__.get('enable_stdout_capture', True): |
||
186 | sys.stdout = ListO(module.log_buffer) |
||
187 | self._reset() |
||
188 | result = self._run_task(task, response) |
||
189 | if inspect.isgenerator(result): |
||
190 | for r in result: |
||
191 | self._run_func(self.on_result, r, response, task) |
||
192 | else: |
||
193 | self._run_func(self.on_result, result, response, task) |
||
194 | except Exception as e: |
||
195 | logger.exception(e) |
||
196 | exception = e |
||
197 | finally: |
||
198 | follows = self._follows |
||
199 | messages = self._messages |
||
200 | logs = list(module.log_buffer) |
||
201 | extinfo = self._extinfo |
||
202 | save = self.save |
||
203 | |||
204 | sys.stdout = stdout |
||
205 | self.task = None |
||
206 | self.response = None |
||
207 | self.save = None |
||
208 | |||
209 | module.log_buffer[:] = [] |
||
210 | return ProcessorResult(result, follows, messages, logs, exception, extinfo, save) |
||
211 | |||
212 | def _crawl(self, url, **kwargs): |
||
213 | """ |
||
214 | real crawl API |
||
215 | |||
216 | checking kwargs, and repack them to each sub-dict |
||
217 | """ |
||
218 | task = {} |
||
219 | |||
220 | assert len(url) < 1024, "Maximum (1024) URL length error." |
||
221 | |||
222 | if kwargs.get('callback'): |
||
223 | callback = kwargs['callback'] |
||
224 | if isinstance(callback, six.string_types) and hasattr(self, callback): |
||
225 | func = getattr(self, callback) |
||
226 | elif six.callable(callback) and six.get_method_self(callback) is self: |
||
227 | func = callback |
||
228 | kwargs['callback'] = func.__name__ |
||
229 | else: |
||
230 | raise NotImplementedError("self.%s() not implemented!" % callback) |
||
231 | if hasattr(func, '_config'): |
||
232 | for k, v in iteritems(func._config): |
||
233 | if isinstance(v, dict) and isinstance(kwargs.get(k), dict): |
||
234 | kwargs[k].update(v) |
||
235 | else: |
||
236 | kwargs.setdefault(k, v) |
||
237 | |||
238 | for k, v in iteritems(self.crawl_config): |
||
239 | if isinstance(v, dict) and isinstance(kwargs.get(k), dict): |
||
240 | kwargs[k].update(v) |
||
241 | else: |
||
242 | kwargs.setdefault(k, v) |
||
243 | |||
244 | url = quote_chinese(_build_url(url.strip(), kwargs.pop('params', None))) |
||
245 | if kwargs.get('files'): |
||
246 | assert isinstance( |
||
247 | kwargs.get('data', {}), dict), "data must be a dict when using with files!" |
||
248 | content_type, data = _encode_multipart_formdata(kwargs.pop('data', {}), |
||
249 | kwargs.pop('files', {})) |
||
250 | kwargs.setdefault('headers', {}) |
||
251 | kwargs['headers']['Content-Type'] = content_type |
||
252 | kwargs['data'] = data |
||
253 | if kwargs.get('data'): |
||
254 | kwargs['data'] = _encode_params(kwargs['data']) |
||
255 | if kwargs.get('data'): |
||
256 | kwargs.setdefault('method', 'POST') |
||
257 | |||
258 | schedule = {} |
||
259 | for key in ('priority', 'retries', 'exetime', 'age', 'itag', 'force_update', |
||
260 | 'auto_recrawl'): |
||
261 | if key in kwargs: |
||
262 | schedule[key] = kwargs.pop(key) |
||
263 | task['schedule'] = schedule |
||
264 | |||
265 | fetch = {} |
||
266 | for key in ( |
||
267 | 'method', |
||
268 | 'headers', |
||
269 | 'data', |
||
270 | 'timeout', |
||
271 | 'allow_redirects', |
||
272 | 'cookies', |
||
273 | 'proxy', |
||
274 | 'etag', |
||
275 | 'last_modifed', |
||
276 | 'last_modified', |
||
277 | 'save', |
||
278 | 'js_run_at', |
||
279 | 'js_script', |
||
280 | 'js_viewport_width', |
||
281 | 'js_viewport_height', |
||
282 | 'load_images', |
||
283 | 'fetch_type', |
||
284 | 'use_gzip', |
||
285 | 'validate_cert', |
||
286 | 'max_redirects', |
||
287 | 'robots_txt' |
||
288 | ): |
||
289 | if key in kwargs: |
||
290 | fetch[key] = kwargs.pop(key) |
||
291 | task['fetch'] = fetch |
||
292 | |||
293 | process = {} |
||
294 | for key in ('callback', ): |
||
295 | if key in kwargs: |
||
296 | process[key] = kwargs.pop(key) |
||
297 | task['process'] = process |
||
298 | |||
299 | task['project'] = self.project_name |
||
300 | task['url'] = url |
||
301 | if 'taskid' in kwargs: |
||
302 | task['taskid'] = kwargs.pop('taskid') |
||
303 | else: |
||
304 | task['taskid'] = self.get_taskid(task) |
||
305 | |||
306 | if kwargs: |
||
307 | raise TypeError('crawl() got unexpected keyword argument: %s' % kwargs.keys()) |
||
308 | |||
309 | cache_key = "%(project)s:%(taskid)s" % task |
||
310 | if cache_key not in self._follows_keys: |
||
311 | self._follows_keys.add(cache_key) |
||
312 | self._follows.append(task) |
||
313 | return task |
||
314 | |||
315 | def get_taskid(self, task): |
||
316 | '''Generate taskid by information of task md5(url) by default, override me''' |
||
317 | return md5string(task['url']) |
||
318 | |||
319 | # apis |
||
320 | def crawl(self, url, **kwargs): |
||
321 | ''' |
||
322 | available params: |
||
323 | url |
||
324 | callback |
||
325 | |||
326 | method |
||
327 | params |
||
328 | data |
||
329 | files |
||
330 | headers |
||
331 | timeout |
||
332 | allow_redirects |
||
333 | cookies |
||
334 | proxy |
||
335 | etag |
||
336 | last_modified |
||
337 | auto_recrawl |
||
338 | |||
339 | fetch_type |
||
340 | js_run_at |
||
341 | js_script |
||
342 | js_viewport_width |
||
343 | js_viewport_height |
||
344 | load_images |
||
345 | |||
346 | priority |
||
347 | retries |
||
348 | exetime |
||
349 | age |
||
350 | itag |
||
351 | |||
352 | save |
||
353 | taskid |
||
354 | |||
355 | full documents: http://pyspider.readthedocs.org/en/latest/apis/self.crawl/ |
||
356 | ''' |
||
357 | |||
358 | if isinstance(url, six.string_types) and url.startswith('curl '): |
||
359 | curl_kwargs = curl_to_arguments(url) |
||
360 | url = curl_kwargs.pop('urls') |
||
361 | for k, v in iteritems(curl_kwargs): |
||
362 | kwargs.setdefault(k, v) |
||
363 | |||
364 | if isinstance(url, six.string_types): |
||
365 | return self._crawl(url, **kwargs) |
||
366 | elif hasattr(url, "__iter__"): |
||
367 | result = [] |
||
368 | for each in url: |
||
369 | result.append(self._crawl(each, **kwargs)) |
||
370 | return result |
||
371 | |||
372 | def is_debugger(self): |
||
373 | """Return true if running in debugger""" |
||
374 | return self.__env__.get('debugger') |
||
375 | |||
376 | def send_message(self, project, msg, url='data:,on_message'): |
||
377 | """Send messages to other project.""" |
||
378 | self._messages.append((project, msg, url)) |
||
379 | |||
380 | def on_message(self, project, msg): |
||
381 | """Receive message from other project, override me.""" |
||
382 | pass |
||
383 | |||
384 | def on_result(self, result): |
||
385 | """Receiving returns from other callback, override me.""" |
||
386 | if not result: |
||
387 | return |
||
388 | assert self.task, "on_result can't outside a callback." |
||
389 | if self.is_debugger(): |
||
390 | pprint(result) |
||
391 | if self.__env__.get('result_queue'): |
||
392 | self.__env__['result_queue'].put((self.task, result)) |
||
393 | |||
394 | @not_send_status |
||
395 | def _on_message(self, response): |
||
396 | project, msg = response.save |
||
397 | return self.on_message(project, msg) |
||
398 | |||
399 | @not_send_status |
||
400 | def _on_cronjob(self, response, task): |
||
401 | if (not response.save |
||
402 | or not isinstance(response.save, dict) |
||
403 | or 'tick' not in response.save): |
||
404 | return |
||
405 | |||
406 | # When triggered, a '_on_cronjob' task is sent from scheudler with 'tick' in |
||
407 | # Response.save. Scheduler may at least send the trigger task every GCD of the |
||
408 | # inverval of the cronjobs. The method should check the tick for each cronjob |
||
409 | # function to confirm the execute interval. |
||
410 | for cronjob in self._cron_jobs: |
||
411 | if response.save['tick'] % cronjob.tick != 0: |
||
412 | continue |
||
413 | function = cronjob.__get__(self, self.__class__) |
||
414 | self._run_func(function, response, task) |
||
415 | |||
416 | def _on_get_info(self, response, task): |
||
417 | """Sending runtime infomation about this script.""" |
||
418 | for each in response.save or []: |
||
419 | if each == 'min_tick': |
||
420 | self.save[each] = self._min_tick |
||
421 | elif each == 'retry_delay': |
||
422 | if not isinstance(self.retry_delay, dict): |
||
423 | self.retry_delay = {'': self.retry_delay} |
||
424 | self.save[each] = self.retry_delay |
||
425 | |||
426 | @not_send_status |
||
427 | def on_finished(self, response, task): |
||
428 | pass |
||
429 |