Processor.run()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 21
rs 7.8867
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-16 22:59:56
7
8
import sys
9
import six
10
import time
11
import logging
12
import traceback
13
logger = logging.getLogger("processor")
14
15
from six.moves import queue as Queue
16
from pyspider.libs import utils
17
from pyspider.libs.log import LogFormatter
18
from pyspider.libs.utils import pretty_unicode, hide_me
19
from pyspider.libs.response import rebuild_response
20
from .project_module import ProjectManager, ProjectFinder
21
22
23
class ProcessorResult(object):
24
    """The result and logs producted by a callback"""
25
26
    def __init__(self, result=None, follows=(), messages=(),
27
                 logs=(), exception=None, extinfo=None, save=None):
28
        if extinfo is None:
29
            extinfo = {}
30
        self.result = result
31
        self.follows = follows
32
        self.messages = messages
33
        self.logs = logs
34
        self.exception = exception
35
        self.extinfo = extinfo
36
        self.save = save
37
38
    def rethrow(self):
39
        """rethrow the exception"""
40
41
        if self.exception:
42
            raise self.exception
43
44
    def logstr(self):
45
        """handler the log records to formatted string"""
46
47
        result = []
48
        formater = LogFormatter(color=False)
49
        for record in self.logs:
50
            if isinstance(record, six.string_types):
51
                result.append(pretty_unicode(record))
52
            else:
53
                if record.exc_info:
54
                    a, b, tb = record.exc_info
55
                    tb = hide_me(tb, globals())
56
                    record.exc_info = a, b, tb
57
                result.append(pretty_unicode(formater.format(record)))
58
                result.append(u'\n')
59
        return u''.join(result)
60
61
62
class Processor(object):
63
    PROCESS_TIME_LIMIT = 30
64
    EXCEPTION_LIMIT = 3
65
66
    RESULT_LOGS_LIMIT = 1000
67
    RESULT_RESULT_LIMIT = 10
68
69
    def __init__(self, projectdb, inqueue, status_queue, newtask_queue, result_queue,
70
                 enable_stdout_capture=True,
71
                 enable_projects_import=True,
72
                 process_time_limit=PROCESS_TIME_LIMIT):
73
        self.inqueue = inqueue
74
        self.status_queue = status_queue
75
        self.newtask_queue = newtask_queue
76
        self.result_queue = result_queue
77
        self.projectdb = projectdb
78
        self.enable_stdout_capture = enable_stdout_capture
79
80
        self._quit = False
81
        self._exceptions = 10
82
        self.project_manager = ProjectManager(projectdb, dict(
83
            result_queue=self.result_queue,
84
            enable_stdout_capture=self.enable_stdout_capture,
85
            process_time_limit=process_time_limit,
86
        ))
87
88
        if enable_projects_import:
89
            self.enable_projects_import()
90
91
    def enable_projects_import(self):
92
        '''
93
        Enable import other project as module
94
95
        `from project import project_name`
96
        '''
97
        sys.meta_path.append(ProjectFinder(self.projectdb))
98
99
    def __del__(self):
100
        pass
101
102
    def on_task(self, task, response):
103
        '''Deal one task'''
104
        start_time = time.time()
105
        response = rebuild_response(response)
106
107
        try:
108
            assert 'taskid' in task, 'need taskid in task'
109
            project = task['project']
110
            updatetime = task.get('project_updatetime', None)
111
            md5sum = task.get('project_md5sum', None)
112
            project_data = self.project_manager.get(project, updatetime, md5sum)
113
            assert project_data, "no such project!"
114
            if project_data.get('exception'):
115
                ret = ProcessorResult(logs=(project_data.get('exception_log'), ),
116
                                      exception=project_data['exception'])
117
            else:
118
                ret = project_data['instance'].run_task(
119
                    project_data['module'], task, response)
120
        except Exception as e:
121
            logstr = traceback.format_exc()
122
            ret = ProcessorResult(logs=(logstr, ), exception=e)
123
        process_time = time.time() - start_time
124
125
        if not ret.extinfo.get('not_send_status', False):
126
            if ret.exception:
127
                track_headers = dict(response.headers)
128
            else:
129
                track_headers = {}
130
                for name in ('etag', 'last-modified'):
131
                    if name not in response.headers:
132
                        continue
133
                    track_headers[name] = response.headers[name]
134
135
            status_pack = {
136
                'taskid': task['taskid'],
137
                'project': task['project'],
138
                'url': task.get('url'),
139
                'track': {
140
                    'fetch': {
141
                        'ok': response.isok(),
142
                        'redirect_url': response.url if response.url != response.orig_url else None,
143
                        'time': response.time,
144
                        'error': response.error,
145
                        'status_code': response.status_code,
146
                        'encoding': getattr(response, '_encoding', None),
147
                        'headers': track_headers,
148
                        'content': response.text[:500] if ret.exception else None,
149
                    },
150
                    'process': {
151
                        'ok': not ret.exception,
152
                        'time': process_time,
153
                        'follows': len(ret.follows),
154
                        'result': (
155
                            None if ret.result is None
156
                            else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT]
157
                        ),
158
                        'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:],
159
                        'exception': ret.exception,
160
                    },
161
                    'save': ret.save,
162
                },
163
            }
164
            if 'schedule' in task:
165
                status_pack['schedule'] = task['schedule']
166
167
            # FIXME: unicode_obj should used in scheduler before store to database
168
            # it's used here for performance.
169
            self.status_queue.put(utils.unicode_obj(status_pack))
170
171
        # FIXME: unicode_obj should used in scheduler before store to database
172
        # it's used here for performance.
173
        if ret.follows:
174
            for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)):
175
                self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])
176
177
        for project, msg, url in ret.messages:
178
            try:
179
                self.on_task({
180
                    'taskid': utils.md5string(url),
181
                    'project': project,
182
                    'url': url,
183
                    'process': {
184
                        'callback': '_on_message',
185
                    }
186
                }, {
187
                    'status_code': 200,
188
                    'url': url,
189
                    'save': (task['project'], msg),
190
                })
191
            except Exception as e:
192
                logger.exception('Sending message error.')
193
                continue
194
195
        if ret.exception:
196
            logger_func = logger.error
197
        else:
198
            logger_func = logger.info
199
        logger_func('process %s:%s %s -> [%d] len:%d -> result:%.10r fol:%d msg:%d err:%r' % (
200
            task['project'], task['taskid'],
201
            task.get('url'), response.status_code, len(response.content),
202
            ret.result, len(ret.follows), len(ret.messages), ret.exception))
203
        return True
204
205
    def quit(self):
206
        '''Set quit signal'''
207
        self._quit = True
208
209
    def run(self):
210
        '''Run loop'''
211
        logger.info("processor starting...")
212
213
        while not self._quit:
214
            try:
215
                task, response = self.inqueue.get(timeout=1)
216
                self.on_task(task, response)
217
                self._exceptions = 0
218
            except Queue.Empty as e:
219
                continue
220
            except KeyboardInterrupt:
221
                break
222
            except Exception as e:
223
                logger.exception(e)
224
                self._exceptions += 1
225
                if self._exceptions > self.EXCEPTION_LIMIT:
226
                    break
227
                continue
228
229
        logger.info("processor exiting...")
230