Completed
Push — master ( 0e5b36...77196e )
by Roy
01:16
created

tests.TestScheduler.test_10_new_task_ignore()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 8
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-08 22:37:13
7
8
import os
9
import time
10
import shutil
11
import unittest2 as unittest
12
import logging
13
import logging.config
14
logging.config.fileConfig("pyspider/logging.conf")
15
16
from pyspider.scheduler.task_queue import TaskQueue
17
from pyspider.libs import utils
18
19
20
class TestTaskQueue(unittest.TestCase):
21
22
    @classmethod
23
    def setUpClass(self):
24
        self.task_queue = TaskQueue()
25
        self.task_queue.rate = 100000
26
        self.task_queue.burst = 100000
27
        self.task_queue.processing_timeout = 0.5
28
29
    def test_10_put(self):
30
        self.task_queue.put('a3', 0, time.time() + 0.5)
31
        self.task_queue.put('a4', 3, time.time() + 0.2)
32
        self.task_queue.put('a2', 0)
33
        self.task_queue.put('a1', 1)
34
        self.assertEqual(self.task_queue.size(), 4)
35
36
    def test_20_update(self):
37
        self.task_queue.put('a2', 4)
38
        self.assertEqual(self.task_queue.size(), 4)
39
        self.task_queue.put('a3', 2, 0)
40
        self.assertEqual(self.task_queue.size(), 4)
41
42
    def test_30_get_from_priority_queue(self):
43
        self.assertEqual(self.task_queue.get(), 'a2')
44
        self.assertEqual(self.task_queue.size(), 4)
45
46
    def test_40_time_queue_1(self):
47
        self.task_queue.check_update()
48
        self.assertEqual(self.task_queue.get(), 'a3')
49
        self.assertEqual(self.task_queue.size(), 4)
50
51
    def test_50_time_queue_2(self):
52
        time.sleep(0.3)
53
        self.task_queue.check_update()
54
        self.assertEqual(self.task_queue.get(), 'a4')
55
        self.assertEqual(self.task_queue.get(), 'a1')
56
        self.assertEqual(self.task_queue.size(), 4)
57
58
    def test_60_processing_queue(self):
59
        time.sleep(0.5)
60
        self.task_queue.check_update()
61
        self.assertEqual(self.task_queue.get(), 'a2')
62
        self.assertEqual(len(self.task_queue), 4)
63
        self.assertEqual(self.task_queue.get(), 'a4')
64
        self.assertEqual(self.task_queue.get(), 'a3')
65
        self.assertEqual(self.task_queue.get(), 'a1')
66
        self.assertEqual(len(self.task_queue), 4)
67
68
    def test_70_done(self):
69
        self.assertTrue(self.task_queue.done('a2'))
70
        self.assertTrue(self.task_queue.done('a1'))
71
        self.assertEqual(len(self.task_queue), 2)
72
        self.assertTrue(self.task_queue.done('a4'))
73
        self.assertTrue(self.task_queue.done('a3'))
74
        self.assertEqual(len(self.task_queue), 0)
75
76
77
from pyspider.scheduler.token_bucket import Bucket
78
79
80
class TestBucket(unittest.TestCase):
81
82
    def test_bucket(self):
83
        bucket = Bucket(100, 1000)
84
        self.assertEqual(bucket.get(), 1000)
85
        time.sleep(0.1)
86
        self.assertEqual(bucket.get(), 1000)
87
        bucket.desc(100)
88
        self.assertEqual(bucket.get(), 900)
89
        time.sleep(0.1)
90
        self.assertAlmostEqual(bucket.get(), 910, delta=2)
91
        time.sleep(0.1)
92
        self.assertAlmostEqual(bucket.get(), 920, delta=2)
93
94
95
try:
96
    from six.moves import xmlrpc_client
97
except ImportError:
98
    import xmlrpclib as xmlrpc_client
99
from pyspider.scheduler.scheduler import Scheduler
100
from pyspider.database.sqlite import taskdb, projectdb, resultdb
101
from pyspider.libs.multiprocessing_queue import Queue
102
from pyspider.libs.utils import run_in_thread
103
104
105
class TestScheduler(unittest.TestCase):
106
    taskdb_path = './data/tests/task.db'
107
    projectdb_path = './data/tests/project.db'
108
    resultdb_path = './data/tests/result.db'
109
    check_project_time = 1
110
    scheduler_xmlrpc_port = 23333
111
112
    @classmethod
113
    def setUpClass(self):
114
        shutil.rmtree('./data/tests', ignore_errors=True)
115
        os.makedirs('./data/tests')
116
117
        def get_taskdb():
118
            return taskdb.TaskDB(self.taskdb_path)
119
        self.taskdb = get_taskdb()
120
121
        def get_projectdb():
122
            return projectdb.ProjectDB(self.projectdb_path)
123
        self.projectdb = get_projectdb()
124
125
        def get_resultdb():
126
            return resultdb.ResultDB(self.resultdb_path)
127
        self.resultdb = get_resultdb()
128
129
        self.newtask_queue = Queue(10)
130
        self.status_queue = Queue(10)
131
        self.scheduler2fetcher = Queue(10)
132
        self.rpc = xmlrpc_client.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port)
133
134
        def run_scheduler():
135
            scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(),
136
                                  newtask_queue=self.newtask_queue, status_queue=self.status_queue,
137
                                  out_queue=self.scheduler2fetcher, data_path="./data/tests/",
138
                                  resultdb=get_resultdb())
139
            scheduler.UPDATE_PROJECT_INTERVAL = 0.1
140
            scheduler.LOOP_INTERVAL = 0.1
141
            scheduler.INQUEUE_LIMIT = 10
142
            scheduler.DELETE_TIME = 0
143
            scheduler.DEFAULT_RETRY_DELAY = {'': 5}
144
            scheduler._last_tick = int(time.time())  # not dispatch cronjob
145
            self.xmlrpc_thread = run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
146
            scheduler.run()
147
148
        self.process = run_in_thread(run_scheduler)
149
        time.sleep(1)
150
151
    @classmethod
152
    def tearDownClass(self):
153
        if self.process.is_alive():
154
            self.rpc._quit()
155
            self.process.join(5)
156
        self.xmlrpc_thread.join()
157
        assert not self.process.is_alive()
158
        shutil.rmtree('./data/tests', ignore_errors=True)
159
        time.sleep(1)
160
161
        assert not utils.check_port_open(5000)
162
        assert not utils.check_port_open(self.scheduler_xmlrpc_port)
163
        assert not utils.check_port_open(24444)
164
        assert not utils.check_port_open(25555)
165
166
    def test_10_new_task_ignore(self):
167
        self.newtask_queue.put({
168
            'taskid': 'taskid',
169
            'project': 'test_project',
170
            'url': 'url'
171
        })
172
        self.assertEqual(self.rpc.size(), 0)
173
        self.assertEqual(len(self.rpc.get_active_tasks()), 0)
174
175
    def test_20_new_project(self):
176
        self.projectdb.insert('test_project', {
177
            'name': 'test_project',
178
            'group': 'group',
179
            'status': 'TODO',
180
            'script': 'import time\nprint(time.time())',
181
            'comments': 'test project',
182
            'rate': 1.0,
183
            'burst': 10,
184
        })
185
186
    def test_30_update_project(self):
187
        from six.moves import queue as Queue
188
        with self.assertRaises(Queue.Empty):
189
            task = self.scheduler2fetcher.get(timeout=1)
190
        self.projectdb.update('test_project', status="DEBUG")
191
        time.sleep(0.1)
192
        self.rpc.update_project()
193
194
        task = self.scheduler2fetcher.get(timeout=10)
195
        self.assertIsNotNone(task)
196
        self.assertEqual(task['url'], 'data:,_on_get_info')
197
198
    def test_34_new_not_used_project(self):
199
        self.projectdb.insert('test_project_not_started', {
200
            'name': 'test_project_not_started',
201
            'group': 'group',
202
            'status': 'RUNNING',
203
            'script': 'import time\nprint(time.time())',
204
            'comments': 'test project',
205
            'rate': 1.0,
206
            'burst': 10,
207
        })
208
        task = self.scheduler2fetcher.get(timeout=1)
209
        self.assertEqual(task['taskid'], '_on_get_info')
210
211
    def test_35_new_task(self):
212
        time.sleep(0.2)
213
        self.newtask_queue.put({
214
            'taskid': 'taskid',
215
            'project': 'test_project',
216
            'url': 'url',
217
            'fetch': {
218
                'data': 'abc',
219
            },
220
            'process': {
221
                'data': 'abc',
222
            },
223
            'schedule': {
224
                'age': 0,
225
            },
226
        })
227
228
        time.sleep(0.5)
229
        task = self.scheduler2fetcher.get(timeout=10)
230
        self.assertGreater(len(self.rpc.get_active_tasks()), 0)
231
        self.assertIsNotNone(task)
232
        self.assertEqual(task['project'], 'test_project')
233
        self.assertIn('schedule', task)
234
        self.assertIn('fetch', task)
235
        self.assertIn('process', task)
236
        self.assertIn('track', task)
237
        self.assertEqual(task['fetch']['data'], 'abc')
238
239
    def test_37_force_update_processing_task(self):
240
        self.newtask_queue.put({
241
            'taskid': 'taskid',
242
            'project': 'test_project',
243
            'url': 'url_force_update',
244
            'schedule': {
245
                'age': 10,
246
                'force_update': True,
247
            },
248
        })
249
        time.sleep(0.2)
250
        # it should not block next
251
252
    def test_40_taskdone_error_no_project(self):
253
        self.status_queue.put({
254
            'taskid': 'taskid',
255
            'project': 'no_project',
256
            'url': 'url'
257
        })
258
        time.sleep(0.1)
259
        self.assertEqual(self.rpc.size(), 1)
260
261
    def test_50_taskdone_error_no_track(self):
262
        self.status_queue.put({
263
            'taskid': 'taskid',
264
            'project': 'test_project',
265
            'url': 'url'
266
        })
267
        time.sleep(0.1)
268
        self.assertEqual(self.rpc.size(), 1)
269
        self.status_queue.put({
270
            'taskid': 'taskid',
271
            'project': 'test_project',
272
            'url': 'url',
273
            'track': {}
274
        })
275
        time.sleep(0.1)
276
        self.assertEqual(self.rpc.size(), 1)
277
278
    def test_60_taskdone_failed_retry(self):
279
        self.status_queue.put({
280
            'taskid': 'taskid',
281
            'project': 'test_project',
282
            'url': 'url',
283
            'track': {
284
                'fetch': {
285
                    'ok': True
286
                },
287
                'process': {
288
                    'ok': False
289
                },
290
            }
291
        })
292
        from six.moves import queue as Queue
293
        with self.assertRaises(Queue.Empty):
294
            task = self.scheduler2fetcher.get(timeout=4)
295
        task = self.scheduler2fetcher.get(timeout=5)
296
        self.assertIsNotNone(task)
297
298
    def test_70_taskdone_ok(self):
299
        self.status_queue.put({
300
            'taskid': 'taskid',
301
            'project': 'test_project',
302
            'url': 'url',
303
            'track': {
304
                'fetch': {
305
                    'ok': True
306
                },
307
                'process': {
308
                    'ok': True
309
                },
310
            }
311
        })
312
        time.sleep(0.2)
313
        self.assertEqual(self.rpc.size(), 0)
314
315
    def test_80_newtask_age_ignore(self):
316
        self.newtask_queue.put({
317
            'taskid': 'taskid',
318
            'project': 'test_project',
319
            'url': 'url',
320
            'fetch': {
321
                'data': 'abc',
322
            },
323
            'process': {
324
                'data': 'abc',
325
            },
326
            'schedule': {
327
                'age': 30,
328
            },
329
        })
330
        time.sleep(0.1)
331
        self.assertEqual(self.rpc.size(), 0)
332
333
    def test_82_newtask_via_rpc(self):
334
        self.rpc.newtask({
335
            'taskid': 'taskid',
336
            'project': 'test_project',
337
            'url': 'url',
338
            'fetch': {
339
                'data': 'abc',
340
            },
341
            'process': {
342
                'data': 'abc',
343
            },
344
            'schedule': {
345
                'age': 30,
346
            },
347
        })
348
        time.sleep(0.1)
349
        self.assertEqual(self.rpc.size(), 0)
350
351
    def test_90_newtask_with_itag(self):
352
        time.sleep(0.1)
353
        self.newtask_queue.put({
354
            'taskid': 'taskid',
355
            'project': 'test_project',
356
            'url': 'url',
357
            'fetch': {
358
                'data': 'abc',
359
            },
360
            'process': {
361
                'data': 'abc',
362
            },
363
            'schedule': {
364
                'itag': "abc",
365
                'retries': 1
366
            },
367
        })
368
        task = self.scheduler2fetcher.get(timeout=10)
369
        self.assertIsNotNone(task)
370
371
        self.test_70_taskdone_ok()
372
373
    def test_a10_newtask_restart_by_age(self):
374
        self.newtask_queue.put({
375
            'taskid': 'taskid',
376
            'project': 'test_project',
377
            'url': 'url',
378
            'fetch': {
379
                'data': 'abc',
380
            },
381
            'process': {
382
                'data': 'abc',
383
            },
384
            'schedule': {
385
                'age': 0,
386
                'retries': 1
387
            },
388
        })
389
        task = self.scheduler2fetcher.get(timeout=10)
390
        self.assertIsNotNone(task)
391
392
    def test_a20_failed_retry(self):
393
        self.status_queue.put({
394
            'taskid': 'taskid',
395
            'project': 'test_project',
396
            'url': 'url',
397
            'track': {
398
                'fetch': {
399
                    'ok': True
400
                },
401
                'process': {
402
                    'ok': False
403
                },
404
            }
405
        })
406
        task = self.scheduler2fetcher.get(timeout=5)
407
        self.assertIsNotNone(task)
408
409
        self.status_queue.put({
410
            'taskid': 'taskid',
411
            'project': 'test_project',
412
            'url': 'url',
413
            'track': {
414
                'fetch': {
415
                    'ok': False
416
                },
417
                'process': {
418
                    'ok': False
419
                },
420
            }
421
        })
422
423
        from six.moves import queue as Queue
424
        with self.assertRaises(Queue.Empty):
425
            self.scheduler2fetcher.get(timeout=5)
426
427
    def test_a30_task_verify(self):
428
        self.assertFalse(self.rpc.newtask({
429
            #'taskid': 'taskid#',
430
            'project': 'test_project',
431
            'url': 'url',
432
        }))
433
        self.assertFalse(self.rpc.newtask({
434
            'taskid': 'taskid#',
435
            #'project': 'test_project',
436
            'url': 'url',
437
        }))
438
        self.assertFalse(self.rpc.newtask({
439
            'taskid': 'taskid#',
440
            'project': 'test_project',
441
            #'url': 'url',
442
        }))
443
        self.assertFalse(self.rpc.newtask({
444
            'taskid': 'taskid#',
445
            'project': 'not_exist_project',
446
            'url': 'url',
447
        }))
448
        self.assertTrue(self.rpc.newtask({
449
            'taskid': 'taskid#',
450
            'project': 'test_project',
451
            'url': 'url',
452
        }))
453
454
    def test_a40_success_recrawl(self):
455
        self.newtask_queue.put({
456
            'taskid': 'taskid',
457
            'project': 'test_project',
458
            'url': 'url',
459
            'fetch': {
460
                'data': 'abc',
461
            },
462
            'process': {
463
                'data': 'abc',
464
            },
465
            'schedule': {
466
                'age': 0,
467
                'retries': 1,
468
                'auto_recrawl': True,
469
            },
470
        })
471
        task = self.scheduler2fetcher.get(timeout=10)
472
        self.assertIsNotNone(task)
473
474
        self.status_queue.put({
475
            'taskid': 'taskid',
476
            'project': 'test_project',
477
            'url': 'url',
478
            'schedule': {
479
                'age': 0,
480
                'retries': 1,
481
                'auto_recrawl': True,
482
            },
483
            'track': {
484
                'fetch': {
485
                    'ok': True
486
                },
487
                'process': {
488
                    'ok': True
489
                },
490
            }
491
        })
492
        task = self.scheduler2fetcher.get(timeout=10)
493
        self.assertIsNotNone(task)
494
495
    def test_a50_failed_recrawl(self):
496
        for i in range(3):
497
            self.status_queue.put({
498
                'taskid': 'taskid',
499
                'project': 'test_project',
500
                'url': 'url',
501
                'schedule': {
502
                    'age': 0,
503
                    'retries': 1,
504
                    'auto_recrawl': True,
505
                },
506
                'track': {
507
                    'fetch': {
508
                        'ok': True
509
                    },
510
                    'process': {
511
                        'ok': False
512
                    },
513
                }
514
            })
515
            task = self.scheduler2fetcher.get(timeout=10)
516
            self.assertIsNotNone(task)
517
518
    def test_a60_disable_recrawl(self):
519
        self.status_queue.put({
520
            'taskid': 'taskid',
521
            'project': 'test_project',
522
            'url': 'url',
523
            'schedule': {
524
                'age': 0,
525
                'retries': 1,
526
            },
527
            'track': {
528
                'fetch': {
529
                    'ok': True
530
                },
531
                'process': {
532
                    'ok': True
533
                },
534
            }
535
        })
536
537
        from six.moves import queue as Queue
538
        with self.assertRaises(Queue.Empty):
539
            self.scheduler2fetcher.get(timeout=5)
540
541
    def test_x10_inqueue_limit(self):
542
        self.projectdb.insert('test_inqueue_project', {
543
            'name': 'test_inqueue_project',
544
            'group': 'group',
545
            'status': 'DEBUG',
546
            'script': 'import time\nprint(time.time())',
547
            'comments': 'test project',
548
            'rate': 0,
549
            'burst': 0,
550
        })
551
        time.sleep(0.1)
552
        pre_size = self.rpc.size()
553
        for i in range(20):
554
            self.newtask_queue.put({
555
                'taskid': 'taskid%d' % i,
556
                'project': 'test_inqueue_project',
557
                'url': 'url',
558
                'schedule': {
559
                    'age': 3000,
560
                    'force_update': True,
561
                },
562
            })
563
        time.sleep(1)
564
        self.assertEqual(self.rpc.size() - pre_size, 10)
565
566
    def test_x20_delete_project(self):
567
        self.assertIsNotNone(self.projectdb.get('test_inqueue_project'))
568
        #self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
569
        self.projectdb.update('test_inqueue_project', status="STOP", group="lock,delete")
570
        time.sleep(1)
571
        self.assertIsNone(self.projectdb.get('test_inqueue_project'))
572
        self.taskdb._list_project()
573
        self.assertIsNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
574
        self.assertNotIn('test_inqueue_project', self.rpc.counter('5m', 'sum'))
575
576
    def test_z10_startup(self):
577
        self.assertTrue(self.process.is_alive())
578
579
    def test_z20_quit(self):
580
        self.rpc._quit()
581
        time.sleep(0.2)
582
        self.assertFalse(self.process.is_alive())
583
        self.assertEqual(
584
            self.taskdb.get_task('test_project', 'taskid')['status'],
585
            self.taskdb.SUCCESS
586
        )
587
588
if __name__ == '__main__':
589
    unittest.main()
590