Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

Processor.__del__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-16 22:59:56
7
8
import sys
9
import six
10
import time
11
import logging
12
import traceback
13
logger = logging.getLogger("processor")
14
15
from six.moves import queue as Queue
16
from pyspider.libs import utils
17
from pyspider.libs.log import LogFormatter
18
from pyspider.libs.utils import pretty_unicode, hide_me
19
from pyspider.libs.response import rebuild_response
20
from .project_module import ProjectManager, ProjectFinder
21
22
23
class ProcessorResult(object):
24
    """The result and logs producted by a callback"""
25
26
    def __init__(self, result=None, follows=(), messages=(),
27
                 logs=(), exception=None, extinfo={}, save=None):
28
        self.result = result
29
        self.follows = follows
30
        self.messages = messages
31
        self.logs = logs
32
        self.exception = exception
33
        self.extinfo = extinfo
34
        self.save = save
35
36
    def rethrow(self):
37
        """rethrow the exception"""
38
39
        if self.exception:
40
            raise self.exception
41
42
    def logstr(self):
43
        """handler the log records to formatted string"""
44
45
        result = []
46
        formater = LogFormatter(color=False)
47
        for record in self.logs:
48
            if isinstance(record, six.string_types):
49
                result.append(pretty_unicode(record))
50
            else:
51
                if record.exc_info:
52
                    a, b, tb = record.exc_info
53
                    tb = hide_me(tb, globals())
54
                    record.exc_info = a, b, tb
55
                result.append(pretty_unicode(formater.format(record)))
56
                result.append(u'\n')
57
        return u''.join(result)
58
59
60
class Processor(object):
61
    PROCESS_TIME_LIMIT = 30
62
    EXCEPTION_LIMIT = 3
63
64
    RESULT_LOGS_LIMIT = 1000
65
    RESULT_RESULT_LIMIT = 10
66
67
    def __init__(self, projectdb, inqueue, status_queue, newtask_queue, result_queue,
68
                 enable_stdout_capture=True,
69
                 enable_projects_import=True):
70
        self.inqueue = inqueue
71
        self.status_queue = status_queue
72
        self.newtask_queue = newtask_queue
73
        self.result_queue = result_queue
74
        self.projectdb = projectdb
75
        self.enable_stdout_capture = enable_stdout_capture
76
77
        self._quit = False
78
        self._exceptions = 10
79
        self.project_manager = ProjectManager(projectdb, dict(
80
            result_queue=self.result_queue,
81
            enable_stdout_capture=self.enable_stdout_capture,
82
        ))
83
84
        if enable_projects_import:
85
            self.enable_projects_import()
86
87
    def enable_projects_import(self):
88
        '''
89
        Enable import other project as module
90
91
        `from project import project_name`
92
        '''
93
        if six.PY2:
94
            sys.meta_path.append(ProjectFinder(self.projectdb))
95
96
    def __del__(self):
97
        pass
98
99
    def on_task(self, task, response):
100
        '''Deal one task'''
101
        start_time = time.time()
102
        response = rebuild_response(response)
103
104
        try:
105
            assert 'taskid' in task, 'need taskid in task'
106
            project = task['project']
107
            updatetime = task.get('project_updatetime', None)
108
            md5sum = task.get('project_md5sum', None)
109
            project_data = self.project_manager.get(project, updatetime, md5sum)
110
            assert project_data, "no such project!"
111
            if project_data.get('exception'):
112
                ret = ProcessorResult(logs=(project_data.get('exception_log'), ),
113
                                      exception=project_data['exception'])
114
            else:
115
                ret = project_data['instance'].run_task(
116
                    project_data['module'], task, response)
117
        except Exception as e:
118
            logstr = traceback.format_exc()
119
            ret = ProcessorResult(logs=(logstr, ), exception=e)
120
        process_time = time.time() - start_time
121
122
        if not ret.extinfo.get('not_send_status', False):
123
            if ret.exception:
124
                track_headers = dict(response.headers)
125
            else:
126
                track_headers = {}
127
                for name in ('etag', 'last-modified'):
128
                    if name not in response.headers:
129
                        continue
130
                    track_headers[name] = response.headers[name]
131
132
            status_pack = {
133
                'taskid': task['taskid'],
134
                'project': task['project'],
135
                'url': task.get('url'),
136
                'track': {
137
                    'fetch': {
138
                        'ok': response.isok(),
139
                        'redirect_url': response.url if response.url != response.orig_url else None,
140
                        'time': response.time,
141
                        'error': response.error,
142
                        'status_code': response.status_code,
143
                        'encoding': response.encoding,
144
                        'headers': track_headers,
145
                        'content': response.text[:500] if ret.exception else None,
146
                    },
147
                    'process': {
148
                        'ok': not ret.exception,
149
                        'time': process_time,
150
                        'follows': len(ret.follows),
151
                        'result': (
152
                            None if ret.result is None
153
                            else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT]
154
                        ),
155
                        'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:],
156
                        'exception': ret.exception,
157
                    },
158
                    'save': ret.save,
159
                },
160
            }
161
            if 'schedule' in task:
162
                status_pack['schedule'] = task['schedule']
163
164
            # FIXME: unicode_obj should used in scheduler before store to database
165
            # it's used here for performance.
166
            self.status_queue.put(utils.unicode_obj(status_pack))
167
168
        # FIXME: unicode_obj should used in scheduler before store to database
169
        # it's used here for performance.
170
        if ret.follows:
171
            for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)):
172
                self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])
173
174
        for project, msg, url in ret.messages:
175
            try:
176
                self.on_task({
177
                    'taskid': utils.md5string(url),
178
                    'project': project,
179
                    'url': url,
180
                    'process': {
181
                        'callback': '_on_message',
182
                    }
183
                }, {
184
                    'status_code': 200,
185
                    'url': url,
186
                    'save': (task['project'], msg),
187
                })
188
            except Exception as e:
189
                logger.exception('Sending message error.')
190
                continue
191
192
        if ret.exception:
193
            logger_func = logger.error
194
        else:
195
            logger_func = logger.info
196
        logger_func('process %s:%s %s -> [%d] len:%d -> result:%.10r fol:%d msg:%d err:%r' % (
197
            task['project'], task['taskid'],
198
            task.get('url'), response.status_code, len(response.content),
199
            ret.result, len(ret.follows), len(ret.messages), ret.exception))
200
        return True
201
202
    def quit(self):
203
        '''Set quit signal'''
204
        self._quit = True
205
206
    def run(self):
207
        '''Run loop'''
208
        logger.info("processor starting...")
209
210
        while not self._quit:
211
            try:
212
                task, response = self.inqueue.get(timeout=1)
213
                self.on_task(task, response)
214
                self._exceptions = 0
215
            except Queue.Empty as e:
216
                continue
217
            except KeyboardInterrupt:
218
                break
219
            except Exception as e:
220
                logger.exception(e)
221
                self._exceptions += 1
222
                if self._exceptions > self.EXCEPTION_LIMIT:
223
                    break
224
                continue
225
226
        logger.info("processor exiting...")
227