1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- encoding: utf-8 -*- |
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
4
|
|
|
# Author: Binux<[email protected]> |
5
|
|
|
# http://binux.me |
6
|
|
|
# Created on 2014-02-16 22:59:56 |
7
|
|
|
|
8
|
|
|
import sys |
9
|
|
|
import six |
10
|
|
|
import time |
11
|
|
|
import logging |
12
|
|
|
import traceback |
13
|
|
|
logger = logging.getLogger("processor") |
14
|
|
|
|
15
|
|
|
from six.moves import queue as Queue |
16
|
|
|
from pyspider.libs import utils |
17
|
|
|
from pyspider.libs.log import LogFormatter |
18
|
|
|
from pyspider.libs.utils import pretty_unicode, hide_me |
19
|
|
|
from pyspider.libs.response import rebuild_response |
20
|
|
|
from .project_module import ProjectManager, ProjectFinder |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
class ProcessorResult(object): |
24
|
|
|
"""The result and logs producted by a callback""" |
25
|
|
|
|
26
|
|
|
def __init__(self, result=None, follows=(), messages=(), |
27
|
|
|
logs=(), exception=None, extinfo=None, save=None): |
28
|
|
|
if extinfo is None: |
29
|
|
|
extinfo = {} |
30
|
|
|
self.result = result |
31
|
|
|
self.follows = follows |
32
|
|
|
self.messages = messages |
33
|
|
|
self.logs = logs |
34
|
|
|
self.exception = exception |
35
|
|
|
self.extinfo = extinfo |
36
|
|
|
self.save = save |
37
|
|
|
|
38
|
|
|
def rethrow(self): |
39
|
|
|
"""rethrow the exception""" |
40
|
|
|
|
41
|
|
|
if self.exception: |
42
|
|
|
raise self.exception |
43
|
|
|
|
44
|
|
|
def logstr(self): |
45
|
|
|
"""handler the log records to formatted string""" |
46
|
|
|
|
47
|
|
|
result = [] |
48
|
|
|
formater = LogFormatter(color=False) |
49
|
|
|
for record in self.logs: |
50
|
|
|
if isinstance(record, six.string_types): |
51
|
|
|
result.append(pretty_unicode(record)) |
52
|
|
|
else: |
53
|
|
|
if record.exc_info: |
54
|
|
|
a, b, tb = record.exc_info |
55
|
|
|
tb = hide_me(tb, globals()) |
56
|
|
|
record.exc_info = a, b, tb |
57
|
|
|
result.append(pretty_unicode(formater.format(record))) |
58
|
|
|
result.append(u'\n') |
59
|
|
|
return u''.join(result) |
60
|
|
|
|
61
|
|
|
|
62
|
|
|
class Processor(object): |
63
|
|
|
PROCESS_TIME_LIMIT = 30 |
64
|
|
|
EXCEPTION_LIMIT = 3 |
65
|
|
|
|
66
|
|
|
RESULT_LOGS_LIMIT = 1000 |
67
|
|
|
RESULT_RESULT_LIMIT = 10 |
68
|
|
|
|
69
|
|
|
def __init__(self, projectdb, inqueue, status_queue, newtask_queue, result_queue, |
70
|
|
|
enable_stdout_capture=True, |
71
|
|
|
enable_projects_import=True, |
72
|
|
|
process_time_limit=PROCESS_TIME_LIMIT): |
73
|
|
|
self.inqueue = inqueue |
74
|
|
|
self.status_queue = status_queue |
75
|
|
|
self.newtask_queue = newtask_queue |
76
|
|
|
self.result_queue = result_queue |
77
|
|
|
self.projectdb = projectdb |
78
|
|
|
self.enable_stdout_capture = enable_stdout_capture |
79
|
|
|
|
80
|
|
|
self._quit = False |
81
|
|
|
self._exceptions = 10 |
82
|
|
|
self.project_manager = ProjectManager(projectdb, dict( |
83
|
|
|
result_queue=self.result_queue, |
84
|
|
|
enable_stdout_capture=self.enable_stdout_capture, |
85
|
|
|
process_time_limit=process_time_limit, |
86
|
|
|
)) |
87
|
|
|
|
88
|
|
|
if enable_projects_import: |
89
|
|
|
self.enable_projects_import() |
90
|
|
|
|
91
|
|
|
def enable_projects_import(self): |
92
|
|
|
''' |
93
|
|
|
Enable import other project as module |
94
|
|
|
|
95
|
|
|
`from project import project_name` |
96
|
|
|
''' |
97
|
|
|
sys.meta_path.append(ProjectFinder(self.projectdb)) |
98
|
|
|
|
99
|
|
|
def __del__(self): |
100
|
|
|
pass |
101
|
|
|
|
102
|
|
|
def on_task(self, task, response): |
103
|
|
|
'''Deal one task''' |
104
|
|
|
start_time = time.time() |
105
|
|
|
response = rebuild_response(response) |
106
|
|
|
|
107
|
|
|
try: |
108
|
|
|
assert 'taskid' in task, 'need taskid in task' |
109
|
|
|
project = task['project'] |
110
|
|
|
updatetime = task.get('project_updatetime', None) |
111
|
|
|
md5sum = task.get('project_md5sum', None) |
112
|
|
|
project_data = self.project_manager.get(project, updatetime, md5sum) |
113
|
|
|
assert project_data, "no such project!" |
114
|
|
|
if project_data.get('exception'): |
115
|
|
|
ret = ProcessorResult(logs=(project_data.get('exception_log'), ), |
116
|
|
|
exception=project_data['exception']) |
117
|
|
|
else: |
118
|
|
|
ret = project_data['instance'].run_task( |
119
|
|
|
project_data['module'], task, response) |
120
|
|
|
except Exception as e: |
121
|
|
|
logstr = traceback.format_exc() |
122
|
|
|
ret = ProcessorResult(logs=(logstr, ), exception=e) |
123
|
|
|
process_time = time.time() - start_time |
124
|
|
|
|
125
|
|
|
if not ret.extinfo.get('not_send_status', False): |
126
|
|
|
if ret.exception: |
127
|
|
|
track_headers = dict(response.headers) |
128
|
|
|
else: |
129
|
|
|
track_headers = {} |
130
|
|
|
for name in ('etag', 'last-modified'): |
131
|
|
|
if name not in response.headers: |
132
|
|
|
continue |
133
|
|
|
track_headers[name] = response.headers[name] |
134
|
|
|
|
135
|
|
|
status_pack = { |
136
|
|
|
'taskid': task['taskid'], |
137
|
|
|
'project': task['project'], |
138
|
|
|
'url': task.get('url'), |
139
|
|
|
'track': { |
140
|
|
|
'fetch': { |
141
|
|
|
'ok': response.isok(), |
142
|
|
|
'redirect_url': response.url if response.url != response.orig_url else None, |
143
|
|
|
'time': response.time, |
144
|
|
|
'error': response.error, |
145
|
|
|
'status_code': response.status_code, |
146
|
|
|
'encoding': getattr(response, '_encoding', None), |
147
|
|
|
'headers': track_headers, |
148
|
|
|
'content': response.text[:500] if ret.exception else None, |
149
|
|
|
}, |
150
|
|
|
'process': { |
151
|
|
|
'ok': not ret.exception, |
152
|
|
|
'time': process_time, |
153
|
|
|
'follows': len(ret.follows), |
154
|
|
|
'result': ( |
155
|
|
|
None if ret.result is None |
156
|
|
|
else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT] |
157
|
|
|
), |
158
|
|
|
'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:], |
159
|
|
|
'exception': ret.exception, |
160
|
|
|
}, |
161
|
|
|
'save': ret.save, |
162
|
|
|
}, |
163
|
|
|
} |
164
|
|
|
if 'schedule' in task: |
165
|
|
|
status_pack['schedule'] = task['schedule'] |
166
|
|
|
|
167
|
|
|
# FIXME: unicode_obj should used in scheduler before store to database |
168
|
|
|
# it's used here for performance. |
169
|
|
|
self.status_queue.put(utils.unicode_obj(status_pack)) |
170
|
|
|
|
171
|
|
|
# FIXME: unicode_obj should used in scheduler before store to database |
172
|
|
|
# it's used here for performance. |
173
|
|
|
if ret.follows: |
174
|
|
|
for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)): |
175
|
|
|
self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each]) |
176
|
|
|
|
177
|
|
|
for project, msg, url in ret.messages: |
178
|
|
|
try: |
179
|
|
|
self.on_task({ |
180
|
|
|
'taskid': utils.md5string(url), |
181
|
|
|
'project': project, |
182
|
|
|
'url': url, |
183
|
|
|
'process': { |
184
|
|
|
'callback': '_on_message', |
185
|
|
|
} |
186
|
|
|
}, { |
187
|
|
|
'status_code': 200, |
188
|
|
|
'url': url, |
189
|
|
|
'save': (task['project'], msg), |
190
|
|
|
}) |
191
|
|
|
except Exception as e: |
192
|
|
|
logger.exception('Sending message error.') |
193
|
|
|
continue |
194
|
|
|
|
195
|
|
|
if ret.exception: |
196
|
|
|
logger_func = logger.error |
197
|
|
|
else: |
198
|
|
|
logger_func = logger.info |
199
|
|
|
logger_func('process %s:%s %s -> [%d] len:%d -> result:%.10r fol:%d msg:%d err:%r' % ( |
200
|
|
|
task['project'], task['taskid'], |
201
|
|
|
task.get('url'), response.status_code, len(response.content), |
202
|
|
|
ret.result, len(ret.follows), len(ret.messages), ret.exception)) |
203
|
|
|
return True |
204
|
|
|
|
205
|
|
|
def quit(self): |
206
|
|
|
'''Set quit signal''' |
207
|
|
|
self._quit = True |
208
|
|
|
|
209
|
|
|
def run(self): |
210
|
|
|
'''Run loop''' |
211
|
|
|
logger.info("processor starting...") |
212
|
|
|
|
213
|
|
|
while not self._quit: |
214
|
|
|
try: |
215
|
|
|
task, response = self.inqueue.get(timeout=1) |
216
|
|
|
self.on_task(task, response) |
217
|
|
|
self._exceptions = 0 |
218
|
|
|
except Queue.Empty as e: |
219
|
|
|
continue |
220
|
|
|
except KeyboardInterrupt: |
221
|
|
|
break |
222
|
|
|
except Exception as e: |
223
|
|
|
logger.exception(e) |
224
|
|
|
self._exceptions += 1 |
225
|
|
|
if self._exceptions > self.EXCEPTION_LIMIT: |
226
|
|
|
break |
227
|
|
|
continue |
228
|
|
|
|
229
|
|
|
logger.info("processor exiting...") |
230
|
|
|
|