1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- encoding: utf-8 -*- |
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
4
|
|
|
# Author: Binux<[email protected]> |
5
|
|
|
# http://binux.me |
6
|
|
|
# Created on 2014-02-07 17:05:11 |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
import os |
10
|
|
|
import json |
11
|
|
|
import time |
12
|
|
|
import logging |
13
|
|
|
import itertools |
14
|
|
|
from collections import deque |
15
|
|
|
|
16
|
|
|
from six import iteritems, itervalues |
17
|
|
|
|
18
|
|
|
from pyspider.libs import counter, utils |
19
|
|
|
from six.moves import queue as Queue |
20
|
|
|
from .task_queue import TaskQueue |
21
|
|
|
logger = logging.getLogger('scheduler') |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class Scheduler(object): |
25
|
|
|
UPDATE_PROJECT_INTERVAL = 5 * 60 |
26
|
|
|
default_schedule = { |
27
|
|
|
'priority': 0, |
28
|
|
|
'retries': 3, |
29
|
|
|
'exetime': 0, |
30
|
|
|
'age': -1, |
31
|
|
|
'itag': None, |
32
|
|
|
} |
33
|
|
|
LOOP_LIMIT = 1000 |
34
|
|
|
LOOP_INTERVAL = 0.1 |
35
|
|
|
ACTIVE_TASKS = 100 |
36
|
|
|
INQUEUE_LIMIT = 0 |
37
|
|
|
EXCEPTION_LIMIT = 3 |
38
|
|
|
DELETE_TIME = 24 * 60 * 60 |
39
|
|
|
DEFAULT_RETRY_DELAY = { |
40
|
|
|
0: 30, |
41
|
|
|
1: 1*60*60, |
42
|
|
|
2: 6*60*60, |
43
|
|
|
3: 12*60*60, |
44
|
|
|
'': 24*60*60 |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
def __init__(self, taskdb, projectdb, newtask_queue, status_queue, |
48
|
|
|
out_queue, data_path='./data', resultdb=None): |
49
|
|
|
self.taskdb = taskdb |
50
|
|
|
self.projectdb = projectdb |
51
|
|
|
self.resultdb = resultdb |
52
|
|
|
self.newtask_queue = newtask_queue |
53
|
|
|
self.status_queue = status_queue |
54
|
|
|
self.out_queue = out_queue |
55
|
|
|
self.data_path = data_path |
56
|
|
|
|
57
|
|
|
self._send_buffer = deque() |
58
|
|
|
self._quit = False |
59
|
|
|
self._exceptions = 0 |
60
|
|
|
self.projects = dict() |
61
|
|
|
self._force_update_project = False |
62
|
|
|
self._last_update_project = 0 |
63
|
|
|
self.task_queue = dict() |
64
|
|
|
self._last_tick = int(time.time()) |
65
|
|
|
|
66
|
|
|
self._cnt = { |
67
|
|
|
"5m_time": counter.CounterManager( |
68
|
|
|
lambda: counter.TimebaseAverageEventCounter(30, 10)), |
69
|
|
|
"5m": counter.CounterManager( |
70
|
|
|
lambda: counter.TimebaseAverageWindowCounter(30, 10)), |
71
|
|
|
"1h": counter.CounterManager( |
72
|
|
|
lambda: counter.TimebaseAverageWindowCounter(60, 60)), |
73
|
|
|
"1d": counter.CounterManager( |
74
|
|
|
lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)), |
75
|
|
|
"all": counter.CounterManager( |
76
|
|
|
lambda: counter.TotalCounter()), |
77
|
|
|
} |
78
|
|
|
self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h')) |
79
|
|
|
self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d')) |
80
|
|
|
self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all')) |
81
|
|
|
self._last_dump_cnt = 0 |
82
|
|
|
|
83
|
|
|
def _update_projects(self): |
84
|
|
|
'''Check project update''' |
85
|
|
|
now = time.time() |
86
|
|
|
if ( |
87
|
|
|
not self._force_update_project |
88
|
|
|
and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now |
89
|
|
|
): |
90
|
|
|
return |
91
|
|
|
for project in self.projectdb.check_update(self._last_update_project): |
92
|
|
|
self._update_project(project) |
93
|
|
|
logger.debug("project: %s updated.", project['name']) |
94
|
|
|
self._force_update_project = False |
95
|
|
|
self._last_update_project = now |
96
|
|
|
|
97
|
|
|
def _update_project(self, project): |
98
|
|
|
'''update one project''' |
99
|
|
|
if project['name'] not in self.projects: |
100
|
|
|
self.projects[project['name']] = {} |
101
|
|
|
self.projects[project['name']].update(project) |
102
|
|
|
self.projects[project['name']]['md5sum'] = utils.md5string(project['script']) |
103
|
|
|
if not self.projects[project['name']].get('active_tasks', None): |
104
|
|
|
self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS) |
105
|
|
|
|
106
|
|
|
# load task queue when project is running and delete task_queue when project is stoped |
107
|
|
|
if project['status'] in ('RUNNING', 'DEBUG'): |
108
|
|
|
if project['name'] not in self.task_queue: |
109
|
|
|
self._load_tasks(project['name']) |
110
|
|
|
self.task_queue[project['name']].rate = project['rate'] |
111
|
|
|
self.task_queue[project['name']].burst = project['burst'] |
112
|
|
|
|
113
|
|
|
# update project runtime info from processor by sending a _on_get_info |
114
|
|
|
# request, result is in status_page.track.save |
115
|
|
|
self.on_select_task({ |
116
|
|
|
'taskid': '_on_get_info', |
117
|
|
|
'project': project['name'], |
118
|
|
|
'url': 'data:,_on_get_info', |
119
|
|
|
'status': self.taskdb.SUCCESS, |
120
|
|
|
'fetch': { |
121
|
|
|
'save': ['min_tick', 'retry_delay'], |
122
|
|
|
}, |
123
|
|
|
'process': { |
124
|
|
|
'callback': '_on_get_info', |
125
|
|
|
}, |
126
|
|
|
}) |
127
|
|
|
else: |
128
|
|
|
if project['name'] in self.task_queue: |
|
|
|
|
129
|
|
|
self.task_queue[project['name']].rate = 0 |
130
|
|
|
self.task_queue[project['name']].burst = 0 |
131
|
|
|
del self.task_queue[project['name']] |
132
|
|
|
|
133
|
|
|
if project not in self._cnt['all']: |
134
|
|
|
self._update_project_cnt(project['name']) |
135
|
|
|
|
136
|
|
|
scheduler_task_fields = ['taskid', 'project', 'schedule', ] |
137
|
|
|
|
138
|
|
|
def _load_tasks(self, project): |
139
|
|
|
'''load tasks from database''' |
140
|
|
|
self.task_queue[project] = TaskQueue(rate=0, burst=0) |
141
|
|
|
for task in self.taskdb.load_tasks( |
142
|
|
|
self.taskdb.ACTIVE, project, self.scheduler_task_fields |
143
|
|
|
): |
144
|
|
|
taskid = task['taskid'] |
145
|
|
|
_schedule = task.get('schedule', self.default_schedule) |
146
|
|
|
priority = _schedule.get('priority', self.default_schedule['priority']) |
147
|
|
|
exetime = _schedule.get('exetime', self.default_schedule['exetime']) |
148
|
|
|
self.task_queue[project].put(taskid, priority, exetime) |
149
|
|
|
logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project])) |
150
|
|
|
|
151
|
|
|
if self.projects[project]['status'] in ('RUNNING', 'DEBUG'): |
152
|
|
|
self.task_queue[project].rate = self.projects[project]['rate'] |
153
|
|
|
self.task_queue[project].burst = self.projects[project]['burst'] |
154
|
|
|
else: |
155
|
|
|
self.task_queue[project].rate = 0 |
156
|
|
|
self.task_queue[project].burst = 0 |
157
|
|
|
|
158
|
|
|
if project not in self._cnt['all']: |
159
|
|
|
self._update_project_cnt(project) |
160
|
|
|
self._cnt['all'].value((project, 'pending'), len(self.task_queue[project])) |
161
|
|
|
|
162
|
|
|
def _update_project_cnt(self, project): |
163
|
|
|
status_count = self.taskdb.status_count(project) |
164
|
|
|
self._cnt['all'].value( |
165
|
|
|
(project, 'success'), |
166
|
|
|
status_count.get(self.taskdb.SUCCESS, 0) |
167
|
|
|
) |
168
|
|
|
self._cnt['all'].value( |
169
|
|
|
(project, 'failed'), |
170
|
|
|
status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0) |
171
|
|
|
) |
172
|
|
|
self._cnt['all'].value( |
173
|
|
|
(project, 'pending'), |
174
|
|
|
status_count.get(self.taskdb.ACTIVE, 0) |
175
|
|
|
) |
176
|
|
|
|
177
|
|
|
def task_verify(self, task): |
178
|
|
|
''' |
179
|
|
|
return False if any of 'taskid', 'project', 'url' is not in task dict |
180
|
|
|
or project in not in task_queue |
181
|
|
|
''' |
182
|
|
|
for each in ('taskid', 'project', 'url', ): |
183
|
|
|
if each not in task or not task[each]: |
184
|
|
|
logger.error('%s not in task: %.200r', each, task) |
185
|
|
|
return False |
186
|
|
|
if task['project'] not in self.task_queue: |
187
|
|
|
logger.error('unknown project: %s', task['project']) |
188
|
|
|
return False |
189
|
|
|
return True |
190
|
|
|
|
191
|
|
|
def insert_task(self, task): |
192
|
|
|
'''insert task into database''' |
193
|
|
|
return self.taskdb.insert(task['project'], task['taskid'], task) |
194
|
|
|
|
195
|
|
|
def update_task(self, task): |
196
|
|
|
'''update task in database''' |
197
|
|
|
return self.taskdb.update(task['project'], task['taskid'], task) |
198
|
|
|
|
199
|
|
|
def put_task(self, task): |
200
|
|
|
'''put task to task queue''' |
201
|
|
|
_schedule = task.get('schedule', self.default_schedule) |
202
|
|
|
self.task_queue[task['project']].put( |
203
|
|
|
task['taskid'], |
204
|
|
|
priority=_schedule.get('priority', self.default_schedule['priority']), |
205
|
|
|
exetime=_schedule.get('exetime', self.default_schedule['exetime']) |
206
|
|
|
) |
207
|
|
|
|
208
|
|
|
def send_task(self, task, force=True): |
209
|
|
|
''' |
210
|
|
|
dispatch task to fetcher |
211
|
|
|
|
212
|
|
|
out queue may have size limit to prevent block, a send_buffer is used |
213
|
|
|
''' |
214
|
|
|
try: |
215
|
|
|
self.out_queue.put_nowait(task) |
216
|
|
|
except Queue.Full: |
217
|
|
|
if force: |
218
|
|
|
self._send_buffer.appendleft(task) |
219
|
|
|
else: |
220
|
|
|
raise |
221
|
|
|
|
222
|
|
|
def _check_task_done(self): |
223
|
|
|
'''Check status queue''' |
224
|
|
|
cnt = 0 |
225
|
|
|
try: |
226
|
|
|
while True: |
227
|
|
|
task = self.status_queue.get_nowait() |
228
|
|
|
# check _on_get_info result here |
229
|
|
|
if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task: |
230
|
|
|
self.projects[task['project']].update(task['track'].get('save') or {}) |
231
|
|
|
logger.info( |
232
|
|
|
'%s on_get_info %r', task['project'], task['track'].get('save', {}) |
233
|
|
|
) |
234
|
|
|
continue |
235
|
|
|
elif not self.task_verify(task): |
236
|
|
|
continue |
237
|
|
|
self.on_task_status(task) |
238
|
|
|
cnt += 1 |
239
|
|
|
except Queue.Empty: |
240
|
|
|
pass |
241
|
|
|
return cnt |
242
|
|
|
|
243
|
|
|
merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime'] |
244
|
|
|
|
245
|
|
|
def _check_request(self): |
246
|
|
|
'''Check new task queue''' |
247
|
|
|
tasks = {} |
248
|
|
|
while len(tasks) < self.LOOP_LIMIT: |
249
|
|
|
try: |
250
|
|
|
task = self.newtask_queue.get_nowait() |
251
|
|
|
except Queue.Empty: |
252
|
|
|
break |
253
|
|
|
|
254
|
|
|
if isinstance(task, list): |
255
|
|
|
_tasks = task |
256
|
|
|
else: |
257
|
|
|
_tasks = (task, ) |
258
|
|
|
|
259
|
|
|
for task in _tasks: |
260
|
|
|
if not self.task_verify(task): |
261
|
|
|
continue |
262
|
|
|
|
263
|
|
|
if task['taskid'] in self.task_queue[task['project']]: |
264
|
|
|
if not task.get('schedule', {}).get('force_update', False): |
265
|
|
|
logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
266
|
|
|
continue |
267
|
|
|
|
268
|
|
|
if task['taskid'] in tasks: |
269
|
|
|
if not task.get('schedule', {}).get('force_update', False): |
270
|
|
|
continue |
271
|
|
|
|
272
|
|
|
tasks[task['taskid']] = task |
273
|
|
|
|
274
|
|
|
for task in itervalues(tasks): |
275
|
|
|
self.on_request(task) |
276
|
|
|
|
277
|
|
|
return len(tasks) |
278
|
|
|
|
279
|
|
|
def _check_cronjob(self): |
280
|
|
|
"""Check projects cronjob tick, return True when a new tick is sended""" |
281
|
|
|
now = time.time() |
282
|
|
|
self._last_tick = int(self._last_tick) |
283
|
|
|
if now - self._last_tick < 1: |
284
|
|
|
return False |
285
|
|
|
self._last_tick += 1 |
286
|
|
|
for project in itervalues(self.projects): |
287
|
|
|
if project['status'] not in ('DEBUG', 'RUNNING'): |
288
|
|
|
continue |
289
|
|
|
if project.get('min_tick', 0) == 0: |
290
|
|
|
continue |
291
|
|
|
if self._last_tick % int(project['min_tick']) != 0: |
292
|
|
|
continue |
293
|
|
|
self.on_select_task({ |
294
|
|
|
'taskid': '_on_cronjob', |
295
|
|
|
'project': project['name'], |
296
|
|
|
'url': 'data:,_on_cronjob', |
297
|
|
|
'status': self.taskdb.SUCCESS, |
298
|
|
|
'fetch': { |
299
|
|
|
'save': { |
300
|
|
|
'tick': self._last_tick, |
301
|
|
|
}, |
302
|
|
|
}, |
303
|
|
|
'process': { |
304
|
|
|
'callback': '_on_cronjob', |
305
|
|
|
}, |
306
|
|
|
}) |
307
|
|
|
return True |
308
|
|
|
|
309
|
|
|
request_task_fields = [ |
310
|
|
|
'taskid', |
311
|
|
|
'project', |
312
|
|
|
'url', |
313
|
|
|
'status', |
314
|
|
|
'schedule', |
315
|
|
|
'fetch', |
316
|
|
|
'process', |
317
|
|
|
'track', |
318
|
|
|
'lastcrawltime' |
319
|
|
|
] |
320
|
|
|
|
321
|
|
|
def _check_select(self): |
322
|
|
|
'''Select task to fetch & process''' |
323
|
|
|
while self._send_buffer: |
324
|
|
|
_task = self._send_buffer.pop() |
325
|
|
|
try: |
326
|
|
|
# use force=False here to prevent automatic send_buffer append and get exception |
327
|
|
|
self.send_task(_task, False) |
328
|
|
|
except Queue.Full: |
329
|
|
|
self._send_buffer.append(_task) |
330
|
|
|
break |
331
|
|
|
|
332
|
|
|
if self.out_queue.full(): |
333
|
|
|
return {} |
334
|
|
|
|
335
|
|
|
taskids = [] |
336
|
|
|
cnt = 0 |
337
|
|
|
cnt_dict = dict() |
338
|
|
|
limit = self.LOOP_LIMIT |
339
|
|
|
for project, task_queue in iteritems(self.task_queue): |
340
|
|
|
if cnt >= limit: |
341
|
|
|
break |
342
|
|
|
|
343
|
|
|
# task queue |
344
|
|
|
self.task_queue[project].check_update() |
345
|
|
|
project_cnt = 0 |
346
|
|
|
|
347
|
|
|
# check send_buffer here. when not empty, out_queue may blocked. Not sending tasks |
348
|
|
|
while cnt < limit and project_cnt < limit / 10: |
349
|
|
|
taskid = task_queue.get() |
350
|
|
|
if not taskid: |
351
|
|
|
break |
352
|
|
|
|
353
|
|
|
taskids.append((project, taskid)) |
354
|
|
|
project_cnt += 1 |
355
|
|
|
cnt += 1 |
356
|
|
|
cnt_dict[project] = project_cnt |
357
|
|
|
|
358
|
|
|
for project, taskid in taskids: |
359
|
|
|
self._load_put_task(project, taskid) |
360
|
|
|
|
361
|
|
|
return cnt_dict |
362
|
|
|
|
363
|
|
|
def _load_put_task(self, project, taskid): |
364
|
|
|
task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields) |
365
|
|
|
if not task: |
366
|
|
|
return |
367
|
|
|
task = self.on_select_task(task) |
368
|
|
|
|
369
|
|
|
def _print_counter_log(self): |
370
|
|
|
# print top 5 active counters |
371
|
|
|
keywords = ('pending', 'success', 'retry', 'failed') |
372
|
|
|
total_cnt = {} |
373
|
|
|
project_actives = [] |
374
|
|
|
project_fails = [] |
375
|
|
|
for key in keywords: |
376
|
|
|
total_cnt[key] = 0 |
377
|
|
|
for project, subcounter in iteritems(self._cnt['5m']): |
378
|
|
|
actives = 0 |
379
|
|
|
for key in keywords: |
380
|
|
|
cnt = subcounter.get(key, None) |
381
|
|
|
if cnt: |
382
|
|
|
cnt = cnt.sum |
383
|
|
|
total_cnt[key] += cnt |
384
|
|
|
actives += cnt |
385
|
|
|
|
386
|
|
|
project_actives.append((actives, project)) |
387
|
|
|
|
388
|
|
|
fails = subcounter.get('failed', None) |
389
|
|
|
if fails: |
390
|
|
|
project_fails.append((fails.sum, project)) |
391
|
|
|
|
392
|
|
|
top_2_fails = sorted(project_fails, reverse=True)[:2] |
393
|
|
|
top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails], |
394
|
|
|
reverse=True)[:5 - len(top_2_fails)] |
395
|
|
|
|
396
|
|
|
log_str = ("in 5m: new:%(pending)d,success:%(success)d," |
397
|
|
|
"retry:%(retry)d,failed:%(failed)d" % total_cnt) |
398
|
|
|
for _, project in itertools.chain(top_3_actives, top_2_fails): |
399
|
|
|
subcounter = self._cnt['5m'][project].to_dict(get_value='sum') |
400
|
|
|
log_str += " %s:%d,%d,%d,%d" % (project, |
401
|
|
|
subcounter.get('pending', 0), |
402
|
|
|
subcounter.get('success', 0), |
403
|
|
|
subcounter.get('retry', 0), |
404
|
|
|
subcounter.get('failed', 0)) |
405
|
|
|
logger.info(log_str) |
406
|
|
|
|
407
|
|
|
def _dump_cnt(self): |
408
|
|
|
'''Dump counters to file''' |
409
|
|
|
self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h')) |
410
|
|
|
self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d')) |
411
|
|
|
self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all')) |
412
|
|
|
|
413
|
|
|
def _try_dump_cnt(self): |
414
|
|
|
'''Dump counters every 60 seconds''' |
415
|
|
|
now = time.time() |
416
|
|
|
if now - self._last_dump_cnt > 60: |
417
|
|
|
self._last_dump_cnt = now |
418
|
|
|
self._dump_cnt() |
419
|
|
|
self._print_counter_log() |
420
|
|
|
|
421
|
|
|
def _check_delete(self): |
422
|
|
|
'''Check project delete''' |
423
|
|
|
now = time.time() |
424
|
|
|
for project in list(itervalues(self.projects)): |
425
|
|
|
if project['status'] != 'STOP': |
426
|
|
|
continue |
427
|
|
|
if now - project['updatetime'] < self.DELETE_TIME: |
428
|
|
|
continue |
429
|
|
|
if 'delete' not in self.projectdb.split_group(project['group']): |
430
|
|
|
continue |
431
|
|
|
|
432
|
|
|
logger.warning("deleting project: %s!", project['name']) |
433
|
|
|
if project['name'] in self.task_queue: |
|
|
|
|
434
|
|
|
self.task_queue[project['name']].rate = 0 |
435
|
|
|
self.task_queue[project['name']].burst = 0 |
436
|
|
|
del self.task_queue[project['name']] |
437
|
|
|
del self.projects[project['name']] |
438
|
|
|
self.taskdb.drop(project['name']) |
439
|
|
|
self.projectdb.drop(project['name']) |
440
|
|
|
if self.resultdb: |
441
|
|
|
self.resultdb.drop(project['name']) |
442
|
|
|
|
443
|
|
|
def __len__(self): |
444
|
|
|
return sum(len(x) for x in itervalues(self.task_queue)) |
445
|
|
|
|
446
|
|
|
def quit(self): |
447
|
|
|
'''Set quit signal''' |
448
|
|
|
self._quit = True |
449
|
|
|
|
450
|
|
|
def run_once(self): |
451
|
|
|
'''comsume queues and feed tasks to fetcher, once''' |
452
|
|
|
|
453
|
|
|
self._update_projects() |
454
|
|
|
self._check_task_done() |
455
|
|
|
self._check_request() |
456
|
|
|
while self._check_cronjob(): |
457
|
|
|
pass |
458
|
|
|
self._check_select() |
459
|
|
|
self._check_delete() |
460
|
|
|
self._try_dump_cnt() |
461
|
|
|
|
462
|
|
|
def run(self): |
463
|
|
|
'''Start scheduler loop''' |
464
|
|
|
logger.info("loading projects") |
465
|
|
|
|
466
|
|
|
while not self._quit: |
467
|
|
|
try: |
468
|
|
|
time.sleep(self.LOOP_INTERVAL) |
469
|
|
|
self.run_once() |
470
|
|
|
self._exceptions = 0 |
471
|
|
|
except KeyboardInterrupt: |
472
|
|
|
break |
473
|
|
|
except Exception as e: |
474
|
|
|
logger.exception(e) |
475
|
|
|
self._exceptions += 1 |
476
|
|
|
if self._exceptions > self.EXCEPTION_LIMIT: |
477
|
|
|
break |
478
|
|
|
continue |
479
|
|
|
|
480
|
|
|
logger.info("scheduler exiting...") |
481
|
|
|
self._dump_cnt() |
482
|
|
|
|
483
|
|
|
def trigger_on_start(self, project): |
484
|
|
|
'''trigger an on_start callback of project''' |
485
|
|
|
self.newtask_queue.put({ |
486
|
|
|
"project": project, |
487
|
|
|
"taskid": "on_start", |
488
|
|
|
"url": "data:,on_start", |
489
|
|
|
"process": { |
490
|
|
|
"callback": "on_start", |
491
|
|
|
}, |
492
|
|
|
}) |
493
|
|
|
|
494
|
|
|
def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): |
495
|
|
|
'''Start xmlrpc interface''' |
496
|
|
|
try: |
497
|
|
|
from six.moves.xmlrpc_server import SimpleXMLRPCServer |
498
|
|
|
except ImportError: |
499
|
|
|
from SimpleXMLRPCServer import SimpleXMLRPCServer |
500
|
|
|
|
501
|
|
|
server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests) |
502
|
|
|
server.register_introspection_functions() |
503
|
|
|
server.register_multicall_functions() |
504
|
|
|
|
505
|
|
|
server.register_function(self.quit, '_quit') |
506
|
|
|
server.register_function(self.__len__, 'size') |
507
|
|
|
|
508
|
|
|
def dump_counter(_time, _type): |
509
|
|
|
try: |
510
|
|
|
return self._cnt[_time].to_dict(_type) |
511
|
|
|
except: |
512
|
|
|
logger.exception('') |
513
|
|
|
server.register_function(dump_counter, 'counter') |
514
|
|
|
|
515
|
|
|
def new_task(task): |
516
|
|
|
if self.task_verify(task): |
517
|
|
|
self.newtask_queue.put(task) |
518
|
|
|
return True |
519
|
|
|
return False |
520
|
|
|
server.register_function(new_task, 'newtask') |
521
|
|
|
|
522
|
|
|
def send_task(task): |
523
|
|
|
'''dispatch task to fetcher''' |
524
|
|
|
self.send_task(task) |
525
|
|
|
return True |
526
|
|
|
server.register_function(send_task, 'send_task') |
527
|
|
|
|
528
|
|
|
def update_project(): |
529
|
|
|
self._force_update_project = True |
530
|
|
|
server.register_function(update_project, 'update_project') |
531
|
|
|
|
532
|
|
|
def get_active_tasks(project=None, limit=100): |
533
|
|
|
allowed_keys = set(( |
534
|
|
|
'taskid', |
535
|
|
|
'project', |
536
|
|
|
'status', |
537
|
|
|
'url', |
538
|
|
|
'lastcrawltime', |
539
|
|
|
'updatetime', |
540
|
|
|
'track', |
541
|
|
|
)) |
542
|
|
|
track_allowed_keys = set(( |
543
|
|
|
'ok', |
544
|
|
|
'time', |
545
|
|
|
'follows', |
546
|
|
|
'status_code', |
547
|
|
|
)) |
548
|
|
|
|
549
|
|
|
iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects) |
550
|
|
|
if x and (k == project if project else True)] |
551
|
|
|
tasks = [next(x, None) for x in iters] |
552
|
|
|
result = [] |
553
|
|
|
|
554
|
|
|
while len(result) < limit and tasks and not all(x is None for x in tasks): |
555
|
|
|
updatetime, task = t = max(t for t in tasks if t) |
556
|
|
|
i = tasks.index(t) |
557
|
|
|
tasks[i] = next(iters[i], None) |
558
|
|
|
for key in list(task): |
559
|
|
|
if key == 'track': |
560
|
|
|
for k in list(task[key].get('fetch', [])): |
561
|
|
|
if k not in track_allowed_keys: |
562
|
|
|
del task[key]['fetch'][k] |
563
|
|
|
for k in list(task[key].get('process', [])): |
564
|
|
|
if k not in track_allowed_keys: |
565
|
|
|
del task[key]['process'][k] |
566
|
|
|
if key in allowed_keys: |
567
|
|
|
continue |
568
|
|
|
del task[key] |
569
|
|
|
result.append(t) |
570
|
|
|
# fix for "<type 'exceptions.TypeError'>:dictionary key must be string" |
571
|
|
|
# have no idea why |
572
|
|
|
return json.loads(json.dumps(result)) |
573
|
|
|
server.register_function(get_active_tasks, 'get_active_tasks') |
574
|
|
|
|
575
|
|
|
server.timeout = 0.5 |
576
|
|
|
while not self._quit: |
577
|
|
|
server.handle_request() |
578
|
|
|
server.server_close() |
579
|
|
|
|
580
|
|
|
def on_request(self, task): |
581
|
|
|
if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT: |
582
|
|
|
logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task) |
583
|
|
|
return |
584
|
|
|
|
585
|
|
|
oldtask = self.taskdb.get_task(task['project'], task['taskid'], |
586
|
|
|
fields=self.merge_task_fields) |
587
|
|
|
if oldtask: |
588
|
|
|
return self.on_old_request(task, oldtask) |
589
|
|
|
else: |
590
|
|
|
return self.on_new_request(task) |
591
|
|
|
|
592
|
|
|
def on_new_request(self, task): |
593
|
|
|
'''Called when a new request is arrived''' |
594
|
|
|
task['status'] = self.taskdb.ACTIVE |
595
|
|
|
self.insert_task(task) |
596
|
|
|
self.put_task(task) |
597
|
|
|
|
598
|
|
|
project = task['project'] |
599
|
|
|
self._cnt['5m'].event((project, 'pending'), +1) |
600
|
|
|
self._cnt['1h'].event((project, 'pending'), +1) |
601
|
|
|
self._cnt['1d'].event((project, 'pending'), +1) |
602
|
|
|
self._cnt['all'].event((project, 'pending'), +1) |
603
|
|
|
logger.info('new task %(project)s:%(taskid)s %(url)s', task) |
604
|
|
|
return task |
605
|
|
|
|
606
|
|
|
def on_old_request(self, task, old_task): |
607
|
|
|
'''Called when a crawled task is arrived''' |
608
|
|
|
now = time.time() |
609
|
|
|
|
610
|
|
|
_schedule = task.get('schedule', self.default_schedule) |
611
|
|
|
old_schedule = old_task.get('schedule', {}) |
612
|
|
|
|
613
|
|
|
restart = False |
614
|
|
|
schedule_age = _schedule.get('age', self.default_schedule['age']) |
615
|
|
|
if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'): |
616
|
|
|
restart = True |
617
|
|
|
elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now: |
618
|
|
|
restart = True |
619
|
|
|
elif _schedule.get('force_update'): |
620
|
|
|
restart = True |
621
|
|
|
|
622
|
|
|
if not restart: |
623
|
|
|
logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
624
|
|
|
return |
625
|
|
|
|
626
|
|
|
task['status'] = self.taskdb.ACTIVE |
627
|
|
|
self.update_task(task) |
628
|
|
|
self.put_task(task) |
629
|
|
|
|
630
|
|
|
project = task['project'] |
631
|
|
|
if old_task['status'] != self.taskdb.ACTIVE: |
632
|
|
|
self._cnt['5m'].event((project, 'pending'), +1) |
633
|
|
|
self._cnt['1h'].event((project, 'pending'), +1) |
634
|
|
|
self._cnt['1d'].event((project, 'pending'), +1) |
635
|
|
|
if old_task['status'] == self.taskdb.SUCCESS: |
636
|
|
|
self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1) |
637
|
|
|
elif old_task['status'] == self.taskdb.FAILED: |
638
|
|
|
self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1) |
639
|
|
|
logger.info('restart task %(project)s:%(taskid)s %(url)s', task) |
640
|
|
|
return task |
641
|
|
|
|
642
|
|
|
def on_task_status(self, task): |
643
|
|
|
'''Called when a status pack is arrived''' |
644
|
|
|
try: |
645
|
|
|
procesok = task['track']['process']['ok'] |
646
|
|
|
if not self.task_queue[task['project']].done(task['taskid']): |
647
|
|
|
logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task) |
648
|
|
|
return None |
649
|
|
|
except KeyError as e: |
650
|
|
|
logger.error("Bad status pack: %s", e) |
651
|
|
|
return None |
652
|
|
|
|
653
|
|
|
if procesok: |
654
|
|
|
ret = self.on_task_done(task) |
655
|
|
|
else: |
656
|
|
|
ret = self.on_task_failed(task) |
657
|
|
|
|
658
|
|
|
if task['track']['fetch'].get('time'): |
659
|
|
|
self._cnt['5m_time'].event((task['project'], 'fetch_time'), |
660
|
|
|
task['track']['fetch']['time']) |
661
|
|
|
if task['track']['process'].get('time'): |
662
|
|
|
self._cnt['5m_time'].event((task['project'], 'process_time'), |
663
|
|
|
task['track']['process'].get('time')) |
664
|
|
|
self.projects[task['project']]['active_tasks'].appendleft((time.time(), task)) |
665
|
|
|
return ret |
666
|
|
|
|
667
|
|
|
def on_task_done(self, task): |
668
|
|
|
'''Called when a task is done and success, called by `on_task_status`''' |
669
|
|
|
task['status'] = self.taskdb.SUCCESS |
670
|
|
|
task['lastcrawltime'] = time.time() |
671
|
|
|
|
672
|
|
|
if 'schedule' in task: |
673
|
|
|
if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
674
|
|
|
task['status'] = self.taskdb.ACTIVE |
675
|
|
|
next_exetime = task['schedule'].get('age') |
676
|
|
|
task['schedule']['exetime'] = time.time() + next_exetime |
677
|
|
|
self.put_task(task) |
678
|
|
|
else: |
679
|
|
|
del task['schedule'] |
680
|
|
|
self.update_task(task) |
681
|
|
|
|
682
|
|
|
project = task['project'] |
683
|
|
|
self._cnt['5m'].event((project, 'success'), +1) |
684
|
|
|
self._cnt['1h'].event((project, 'success'), +1) |
685
|
|
|
self._cnt['1d'].event((project, 'success'), +1) |
686
|
|
|
self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1) |
687
|
|
|
logger.info('task done %(project)s:%(taskid)s %(url)s', task) |
688
|
|
|
return task |
689
|
|
|
|
690
|
|
|
def on_task_failed(self, task): |
691
|
|
|
'''Called when a task is failed, called by `on_task_status`''' |
692
|
|
|
|
693
|
|
|
if 'schedule' not in task: |
694
|
|
|
old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule']) |
695
|
|
|
if old_task is None: |
696
|
|
|
logging.error('unknown status pack: %s' % task) |
697
|
|
|
return |
698
|
|
|
task['schedule'] = old_task.get('schedule', {}) |
699
|
|
|
|
700
|
|
|
retries = task['schedule'].get('retries', self.default_schedule['retries']) |
701
|
|
|
retried = task['schedule'].get('retried', 0) |
702
|
|
|
|
703
|
|
|
project_info = self.projects.get(task['project'], {}) |
704
|
|
|
retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY |
705
|
|
|
next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY[''])) |
706
|
|
|
|
707
|
|
|
if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
708
|
|
|
next_exetime = min(next_exetime, task['schedule'].get('age')) |
709
|
|
|
else: |
710
|
|
|
if retried >= retries: |
711
|
|
|
next_exetime = -1 |
712
|
|
|
elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'): |
713
|
|
|
next_exetime = task['schedule'].get('age') |
714
|
|
|
|
715
|
|
|
if next_exetime < 0: |
716
|
|
|
task['status'] = self.taskdb.FAILED |
717
|
|
|
task['lastcrawltime'] = time.time() |
718
|
|
|
self.update_task(task) |
719
|
|
|
|
720
|
|
|
project = task['project'] |
721
|
|
|
self._cnt['5m'].event((project, 'failed'), +1) |
722
|
|
|
self._cnt['1h'].event((project, 'failed'), +1) |
723
|
|
|
self._cnt['1d'].event((project, 'failed'), +1) |
724
|
|
|
self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1) |
725
|
|
|
logger.info('task failed %(project)s:%(taskid)s %(url)s' % task) |
726
|
|
|
return task |
727
|
|
|
else: |
728
|
|
|
task['schedule']['retried'] = retried + 1 |
729
|
|
|
task['schedule']['exetime'] = time.time() + next_exetime |
730
|
|
|
task['lastcrawltime'] = time.time() |
731
|
|
|
self.update_task(task) |
732
|
|
|
self.put_task(task) |
733
|
|
|
|
734
|
|
|
project = task['project'] |
735
|
|
|
self._cnt['5m'].event((project, 'retry'), +1) |
736
|
|
|
self._cnt['1h'].event((project, 'retry'), +1) |
737
|
|
|
self._cnt['1d'].event((project, 'retry'), +1) |
738
|
|
|
# self._cnt['all'].event((project, 'retry'), +1) |
739
|
|
|
logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % ( |
740
|
|
|
retried, retries), task) |
741
|
|
|
return task |
742
|
|
|
|
743
|
|
|
def on_select_task(self, task): |
744
|
|
|
'''Called when a task is selected to fetch & process''' |
745
|
|
|
# inject informations about project |
746
|
|
|
logger.info('select %(project)s:%(taskid)s %(url)s', task) |
747
|
|
|
|
748
|
|
|
project_info = self.projects.get(task['project']) |
749
|
|
|
assert project_info, 'no such project' |
750
|
|
|
task['group'] = project_info.get('group') |
751
|
|
|
task['project_md5sum'] = project_info.get('md5sum') |
752
|
|
|
task['project_updatetime'] = project_info.get('updatetime', 0) |
753
|
|
|
project_info['active_tasks'].appendleft((time.time(), task)) |
754
|
|
|
self.send_task(task) |
755
|
|
|
return task |
756
|
|
|
|
757
|
|
|
|
758
|
|
|
from tornado import gen |
759
|
|
|
|
760
|
|
|
|
761
|
|
|
class OneScheduler(Scheduler): |
762
|
|
|
""" |
763
|
|
|
Scheduler Mixin class for one mode |
764
|
|
|
|
765
|
|
|
overwirted send_task method |
766
|
|
|
call processor.on_task(fetcher.fetch(task)) instead of consuming queue |
767
|
|
|
""" |
768
|
|
|
|
769
|
|
|
def _check_select(self): |
770
|
|
|
""" |
771
|
|
|
interactive mode of select tasks |
772
|
|
|
""" |
773
|
|
|
if not self.interactive: |
774
|
|
|
return super(OneScheduler, self)._check_select() |
775
|
|
|
|
776
|
|
|
# waiting for running tasks |
777
|
|
|
if self.running_task > 0: |
778
|
|
|
return |
779
|
|
|
|
780
|
|
|
is_crawled = [] |
781
|
|
|
|
782
|
|
|
def run(project=None): |
783
|
|
|
return crawl('on_start', project=project) |
784
|
|
|
|
785
|
|
|
def crawl(url, project=None, **kwargs): |
786
|
|
|
""" |
787
|
|
|
Crawl given url, same parameters as BaseHandler.crawl |
788
|
|
|
|
789
|
|
|
url - url or taskid, parameters will be used if in taskdb |
790
|
|
|
project - can be ignored if only one project exists. |
791
|
|
|
""" |
792
|
|
|
|
793
|
|
|
# looking up the project instance |
794
|
|
|
if project is None: |
795
|
|
|
if len(self.projects) == 1: |
796
|
|
|
project = list(self.projects.keys())[0] |
797
|
|
|
else: |
798
|
|
|
raise LookupError('You need specify the project: %r' |
799
|
|
|
% list(self.projects.keys())) |
800
|
|
|
project_data = self.processor.project_manager.get(project) |
801
|
|
|
if not project_data: |
802
|
|
|
raise LookupError('no such project: %s' % project) |
803
|
|
|
|
804
|
|
|
# get task package |
805
|
|
|
instance = project_data['instance'] |
806
|
|
|
instance._reset() |
807
|
|
|
task = instance.crawl(url, **kwargs) |
808
|
|
|
if isinstance(task, list): |
809
|
|
|
raise Exception('url list is not allowed in interactive mode') |
810
|
|
|
|
811
|
|
|
# check task in taskdb |
812
|
|
|
if not kwargs: |
813
|
|
|
dbtask = self.taskdb.get_task(task['project'], task['taskid'], |
814
|
|
|
fields=self.request_task_fields) |
815
|
|
|
if not dbtask: |
816
|
|
|
dbtask = self.taskdb.get_task(task['project'], task['url'], |
817
|
|
|
fields=self.request_task_fields) |
818
|
|
|
if dbtask: |
819
|
|
|
task = dbtask |
820
|
|
|
|
821
|
|
|
# select the task |
822
|
|
|
self.on_select_task(task) |
823
|
|
|
is_crawled.append(True) |
824
|
|
|
|
825
|
|
|
shell.ask_exit() |
826
|
|
|
|
827
|
|
|
def quit_interactive(): |
828
|
|
|
'''Quit interactive mode''' |
829
|
|
|
is_crawled.append(True) |
830
|
|
|
self.interactive = False |
831
|
|
|
shell.ask_exit() |
832
|
|
|
|
833
|
|
|
def quit_pyspider(): |
834
|
|
|
'''Close pyspider''' |
835
|
|
|
is_crawled[:] = [] |
836
|
|
|
shell.ask_exit() |
837
|
|
|
|
838
|
|
|
shell = utils.get_python_console() |
839
|
|
|
shell.interact( |
840
|
|
|
'pyspider shell - Select task\n' |
841
|
|
|
'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n' |
842
|
|
|
'quit_interactive() - Quit interactive mode\n' |
843
|
|
|
'quit_pyspider() - Close pyspider' |
844
|
|
|
) |
845
|
|
|
if not is_crawled: |
846
|
|
|
self.ioloop.stop() |
847
|
|
|
|
848
|
|
|
def __getattr__(self, name): |
849
|
|
|
"""patch for crawl(url, callback=self.index_page) API""" |
850
|
|
|
if self.interactive: |
851
|
|
|
return name |
852
|
|
|
raise AttributeError(name) |
853
|
|
|
|
854
|
|
|
def on_task_status(self, task): |
855
|
|
|
"""Ignore not processing error in interactive mode""" |
856
|
|
|
if not self.interactive: |
857
|
|
|
super(OneScheduler, self).on_task_status(task) |
858
|
|
|
|
859
|
|
|
try: |
860
|
|
|
procesok = task['track']['process']['ok'] |
861
|
|
|
except KeyError as e: |
862
|
|
|
logger.error("Bad status pack: %s", e) |
863
|
|
|
return None |
864
|
|
|
|
865
|
|
|
if procesok: |
866
|
|
|
ret = self.on_task_done(task) |
867
|
|
|
else: |
868
|
|
|
ret = self.on_task_failed(task) |
869
|
|
|
if task['track']['fetch'].get('time'): |
870
|
|
|
self._cnt['5m_time'].event((task['project'], 'fetch_time'), |
871
|
|
|
task['track']['fetch']['time']) |
872
|
|
|
if task['track']['process'].get('time'): |
873
|
|
|
self._cnt['5m_time'].event((task['project'], 'process_time'), |
874
|
|
|
task['track']['process'].get('time')) |
875
|
|
|
self.projects[task['project']]['active_tasks'].appendleft((time.time(), task)) |
876
|
|
|
return ret |
877
|
|
|
|
878
|
|
|
def init_one(self, ioloop, fetcher, processor, |
879
|
|
|
result_worker=None, interactive=False): |
880
|
|
|
self.ioloop = ioloop |
881
|
|
|
self.fetcher = fetcher |
882
|
|
|
self.processor = processor |
883
|
|
|
self.result_worker = result_worker |
884
|
|
|
self.interactive = interactive |
885
|
|
|
self.running_task = 0 |
886
|
|
|
|
887
|
|
|
@gen.coroutine |
888
|
|
|
def do_task(self, task): |
889
|
|
|
self.running_task += 1 |
890
|
|
|
result = yield gen.Task(self.fetcher.fetch, task) |
891
|
|
|
type, task, response = result.args |
892
|
|
|
self.processor.on_task(task, response) |
893
|
|
|
# do with message |
894
|
|
|
while not self.processor.inqueue.empty(): |
895
|
|
|
_task, _response = self.processor.inqueue.get() |
896
|
|
|
self.processor.on_task(_task, _response) |
897
|
|
|
# do with results |
898
|
|
|
while not self.processor.result_queue.empty(): |
899
|
|
|
_task, _result = self.processor.result_queue.get() |
900
|
|
|
if self.result_worker: |
901
|
|
|
self.result_worker.on_result(_task, _result) |
902
|
|
|
self.running_task -= 1 |
903
|
|
|
|
904
|
|
|
def send_task(self, task, force=True): |
905
|
|
|
if self.fetcher.http_client.free_size() <= 0: |
906
|
|
|
if force: |
907
|
|
|
self._send_buffer.appendleft(task) |
908
|
|
|
else: |
909
|
|
|
raise self.outqueue.Full |
910
|
|
|
self.ioloop.add_future(self.do_task(task), lambda x: x.result()) |
911
|
|
|
|
912
|
|
|
def run(self): |
913
|
|
|
import tornado.ioloop |
914
|
|
|
tornado.ioloop.PeriodicCallback(self.run_once, 100, |
915
|
|
|
io_loop=self.ioloop).start() |
916
|
|
|
self.ioloop.start() |
917
|
|
|
|
918
|
|
|
def quit(self): |
919
|
|
|
self.ioloop.stop() |
920
|
|
|
logger.info("scheduler exiting...") |
921
|
|
|
|
922
|
|
|
|
923
|
|
|
import random |
924
|
|
|
import hashlib |
925
|
|
|
import threading |
926
|
|
|
|
927
|
|
|
|
928
|
|
|
class ThreadBaseScheduler(Scheduler): |
929
|
|
|
def __init__(self, threads=4, *args, **kwargs): |
930
|
|
|
self.threads = threads |
931
|
|
|
self.local = threading.local() |
932
|
|
|
|
933
|
|
|
super(ThreadBaseScheduler, self).__init__(*args, **kwargs) |
934
|
|
|
|
935
|
|
|
self._taskdb = self.taskdb |
936
|
|
|
self._projectdb = self.projectdb |
937
|
|
|
self._resultdb = self.resultdb |
938
|
|
|
|
939
|
|
|
self.thread_objs = [] |
940
|
|
|
self.thread_queues = [] |
941
|
|
|
self._start_threads() |
942
|
|
|
assert len(self.thread_queues) > 0 |
943
|
|
|
|
944
|
|
|
@property |
945
|
|
|
def taskdb(self): |
946
|
|
|
return self.local.taskdb |
947
|
|
|
|
948
|
|
|
@taskdb.setter |
949
|
|
|
def taskdb(self, taskdb): |
950
|
|
|
self.local.taskdb = taskdb |
951
|
|
|
|
952
|
|
|
@property |
953
|
|
|
def projectdb(self): |
954
|
|
|
return self.local.projectdb |
955
|
|
|
|
956
|
|
|
@projectdb.setter |
957
|
|
|
def projectdb(self, projectdb): |
958
|
|
|
self.local.projectdb = projectdb |
959
|
|
|
|
960
|
|
|
@property |
961
|
|
|
def resultdb(self): |
962
|
|
|
return self.local.resultdb |
963
|
|
|
|
964
|
|
|
@resultdb.setter |
965
|
|
|
def resultdb(self, resultdb): |
966
|
|
|
self.local.resultdb = resultdb |
967
|
|
|
|
968
|
|
|
def _start_threads(self): |
969
|
|
|
for i in range(self.threads): |
970
|
|
|
queue = Queue.Queue() |
971
|
|
|
thread = threading.Thread(target=self._thread_worker, args=(queue, )) |
972
|
|
|
thread.daemon = True |
973
|
|
|
thread.start() |
974
|
|
|
self.thread_objs.append(thread) |
975
|
|
|
self.thread_queues.append(queue) |
976
|
|
|
|
977
|
|
|
def _thread_worker(self, queue): |
978
|
|
|
self.taskdb = self._taskdb.copy() |
979
|
|
|
self.projectdb = self._projectdb.copy() |
980
|
|
|
self.resultdb = self._resultdb.copy() |
981
|
|
|
|
982
|
|
|
while True: |
983
|
|
|
method, args, kwargs = queue.get() |
984
|
|
|
try: |
985
|
|
|
method(*args, **kwargs) |
986
|
|
|
except Exception as e: |
987
|
|
|
logger.exception(e) |
988
|
|
|
|
989
|
|
|
def _run_in_thread(self, method, *args, **kwargs): |
990
|
|
|
i = kwargs.pop('_i', None) |
991
|
|
|
block = kwargs.pop('_block', False) |
992
|
|
|
|
993
|
|
|
if i is None: |
994
|
|
|
while True: |
995
|
|
|
for queue in self.thread_queues: |
996
|
|
|
if queue.empty(): |
997
|
|
|
break |
998
|
|
|
else: |
999
|
|
|
if block: |
1000
|
|
|
time.sleep(0.1) |
1001
|
|
|
continue |
1002
|
|
|
else: |
1003
|
|
|
queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)] |
1004
|
|
|
break |
1005
|
|
|
else: |
1006
|
|
|
queue = self.thread_queues[i % len(self.thread_queues)] |
1007
|
|
|
|
1008
|
|
|
queue.put((method, args, kwargs)) |
1009
|
|
|
|
1010
|
|
|
if block: |
1011
|
|
|
self._wait_thread() |
1012
|
|
|
|
1013
|
|
|
def _wait_thread(self): |
1014
|
|
|
while True: |
1015
|
|
|
if all(queue.empty() for queue in self.thread_queues): |
1016
|
|
|
break |
1017
|
|
|
time.sleep(0.1) |
1018
|
|
|
|
1019
|
|
|
def _update_project(self, project): |
1020
|
|
|
self._run_in_thread(Scheduler._update_project, self, project) |
1021
|
|
|
|
1022
|
|
|
def on_task_status(self, task): |
1023
|
|
|
i = ord(hashlib.md5(task['taskid']).digest()[-1]) |
1024
|
|
|
self._run_in_thread(Scheduler.on_task_status, self, task, _i=i) |
1025
|
|
|
|
1026
|
|
|
def on_request(self, task): |
1027
|
|
|
i = ord(hashlib.md5(task['taskid']).digest()[-1]) |
1028
|
|
|
self._run_in_thread(Scheduler.on_request, self, task, _i=i) |
1029
|
|
|
|
1030
|
|
|
def _load_put_task(self, project, taskid): |
1031
|
|
|
i = ord(hashlib.md5(taskid).digest()[-1]) |
1032
|
|
|
self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i) |
1033
|
|
|
|
1034
|
|
|
def run_once(self): |
1035
|
|
|
super(ThreadBaseScheduler, self).run_once() |
1036
|
|
|
self._wait_thread() |
1037
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.