Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

TestScheduler.test_a40_success_recrawl()   B

Complexity

Conditions 1

Size

Total Lines 46

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 46
rs 8.9411
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-08 22:37:13
7
8
import os
9
import time
10
import shutil
11
import unittest2 as unittest
12
import logging
13
import logging.config
14
logging.config.fileConfig("pyspider/logging.conf")
15
16
from pyspider.scheduler.task_queue import TaskQueue
17
from pyspider.libs import utils
18
19
20
class TestTaskQueue(unittest.TestCase):
21
22
    @classmethod
23
    def setUpClass(self):
24
        self.task_queue = TaskQueue()
25
        self.task_queue.rate = 100000
26
        self.task_queue.burst = 100000
27
        self.task_queue.processing_timeout = 0.5
28
29
    def test_10_put(self):
30
        self.task_queue.put('a3', 0, time.time() + 0.5)
31
        self.task_queue.put('a4', 3, time.time() + 0.2)
32
        self.task_queue.put('a2', 0)
33
        self.task_queue.put('a1', 1)
34
        self.assertEqual(self.task_queue.size(), 4)
35
36
    def test_20_update(self):
37
        self.task_queue.put('a2', 4)
38
        self.assertEqual(self.task_queue.size(), 4)
39
        self.task_queue.put('a3', 2, 0)
40
        self.assertEqual(self.task_queue.size(), 4)
41
42
    def test_30_get_from_priority_queue(self):
43
        self.assertEqual(self.task_queue.get(), 'a2')
44
        self.assertEqual(self.task_queue.size(), 4)
45
46
    def test_40_time_queue_1(self):
47
        self.task_queue.check_update()
48
        self.assertEqual(self.task_queue.get(), 'a3')
49
        self.assertEqual(self.task_queue.size(), 4)
50
51
    def test_50_time_queue_2(self):
52
        time.sleep(0.3)
53
        self.task_queue.check_update()
54
        self.assertEqual(self.task_queue.get(), 'a4')
55
        self.assertEqual(self.task_queue.get(), 'a1')
56
        self.assertEqual(self.task_queue.size(), 4)
57
58
    def test_60_processing_queue(self):
59
        time.sleep(0.5)
60
        self.task_queue.check_update()
61
        self.assertEqual(self.task_queue.get(), 'a2')
62
        self.assertEqual(len(self.task_queue), 4)
63
        self.assertEqual(self.task_queue.get(), 'a4')
64
        self.assertEqual(self.task_queue.get(), 'a3')
65
        self.assertEqual(self.task_queue.get(), 'a1')
66
        self.assertEqual(len(self.task_queue), 4)
67
68
    def test_70_done(self):
69
        self.assertTrue(self.task_queue.done('a2'))
70
        self.assertTrue(self.task_queue.done('a1'))
71
        self.assertEqual(len(self.task_queue), 2)
72
        self.assertTrue(self.task_queue.done('a4'))
73
        self.assertTrue(self.task_queue.done('a3'))
74
        self.assertEqual(len(self.task_queue), 0)
75
76
77
from pyspider.scheduler.token_bucket import Bucket
78
79
80
class TestBucket(unittest.TestCase):
81
82
    def test_bucket(self):
83
        bucket = Bucket(100, 1000)
84
        self.assertEqual(bucket.get(), 1000)
85
        time.sleep(0.1)
86
        self.assertEqual(bucket.get(), 1000)
87
        bucket.desc(100)
88
        self.assertEqual(bucket.get(), 900)
89
        time.sleep(0.1)
90
        self.assertAlmostEqual(bucket.get(), 910, delta=2)
91
        time.sleep(0.1)
92
        self.assertAlmostEqual(bucket.get(), 920, delta=2)
93
94
95
try:
96
    from six.moves import xmlrpc_client
97
except ImportError:
98
    import xmlrpclib as xmlrpc_client
99
from pyspider.scheduler.scheduler import Scheduler
100
from pyspider.database.sqlite import taskdb, projectdb, resultdb
101
from pyspider.libs.multiprocessing_queue import Queue
102
from pyspider.libs.utils import run_in_thread
103
104
105
class TestScheduler(unittest.TestCase):
106
    taskdb_path = './data/tests/task.db'
107
    projectdb_path = './data/tests/project.db'
108
    resultdb_path = './data/tests/result.db'
109
    check_project_time = 1
110
    scheduler_xmlrpc_port = 23333
111
112
    @classmethod
113
    def setUpClass(self):
114
        shutil.rmtree('./data/tests', ignore_errors=True)
115
        os.makedirs('./data/tests')
116
117
        def get_taskdb():
118
            return taskdb.TaskDB(self.taskdb_path)
119
        self.taskdb = get_taskdb()
120
121
        def get_projectdb():
122
            return projectdb.ProjectDB(self.projectdb_path)
123
        self.projectdb = get_projectdb()
124
125
        def get_resultdb():
126
            return resultdb.ResultDB(self.resultdb_path)
127
        self.resultdb = get_resultdb()
128
129
        self.newtask_queue = Queue(10)
130
        self.status_queue = Queue(10)
131
        self.scheduler2fetcher = Queue(10)
132
        self.rpc = xmlrpc_client.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port)
133
134
        def run_scheduler():
135
            scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(),
136
                                  newtask_queue=self.newtask_queue, status_queue=self.status_queue,
137
                                  out_queue=self.scheduler2fetcher, data_path="./data/tests/",
138
                                  resultdb=get_resultdb())
139
            scheduler.UPDATE_PROJECT_INTERVAL = 0.1
140
            scheduler.LOOP_INTERVAL = 0.1
141
            scheduler.INQUEUE_LIMIT = 10
142
            scheduler.DELETE_TIME = 0
143
            scheduler.DEFAULT_RETRY_DELAY = {'': 5}
144
            scheduler._last_tick = int(time.time())  # not dispatch cronjob
145
            self.xmlrpc_thread = run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
146
            scheduler.run()
147
148
        self.process = run_in_thread(run_scheduler)
149
        time.sleep(1)
150
151 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
    def tearDownClass(self):
153
        if self.process.is_alive():
154
            self.rpc._quit()
155
            self.process.join(5)
156
        self.xmlrpc_thread.join()
157
        assert not self.process.is_alive()
158
        shutil.rmtree('./data/tests', ignore_errors=True)
159
        time.sleep(1)
160
161
        assert not utils.check_port_open(5000)
162
        assert not utils.check_port_open(self.scheduler_xmlrpc_port)
163
        assert not utils.check_port_open(24444)
164
        assert not utils.check_port_open(25555)
165
166
    def test_10_new_task_ignore(self):
167
        '''
168
        task_queue = [ ]
169
        '''
170
        self.newtask_queue.put({
171
            'taskid': 'taskid',
172
            'project': 'test_project',
173
            'url': 'url'
174
        })  # unknown project: test_project
175
        self.assertEqual(self.rpc.size(), 0)
176
        self.assertEqual(len(self.rpc.get_active_tasks()), 0)
177
178
    def test_20_new_project(self):
179
        '''
180
        task_queue = [ ]
181
        '''
182
        self.projectdb.insert('test_project', {
183
            'name': 'test_project',
184
            'group': 'group',
185
            'status': 'TODO',
186
            'script': 'import time\nprint(time.time())',
187
            'comments': 'test project',
188
            'rate': 1.0,
189
            'burst': 10,
190
        })
191
192
    def test_30_update_project(self):
193
        '''
194
        task_queue = [ ]
195
        '''
196
        from six.moves import queue as Queue
197
        with self.assertRaises(Queue.Empty):
198
            task = self.scheduler2fetcher.get(timeout=1)
199
        self.projectdb.update('test_project', status="DEBUG")
200
        time.sleep(0.1)
201
        self.rpc.update_project()
202
203
        task = self.scheduler2fetcher.get(timeout=10)
204
        self.assertIsNotNone(task)
205
        self.assertEqual(task['taskid'], '_on_get_info')  # select test_project:_on_get_info data:,_on_get_info
206
207
    def test_34_new_not_used_project(self):
208
        '''
209
        task_queue = []
210
        '''
211
        self.projectdb.insert('test_project_not_started', {
212
            'name': 'test_project_not_started',
213
            'group': 'group',
214
            'status': 'RUNNING',
215
            'script': 'import time\nprint(time.time())',
216
            'comments': 'test project',
217
            'rate': 1.0,
218
            'burst': 10,
219
        })
220
        task = self.scheduler2fetcher.get(timeout=1)  # select test_project_not_started:_on_get_info data:,_on_get_info
221
        self.assertEqual(task['taskid'], '_on_get_info')
222
223
    def test_35_new_task(self):
224
        '''
225
        task_queue = [ ]
226
        '''
227
        time.sleep(0.2)
228
        self.newtask_queue.put({
229
            'taskid': 'taskid',
230
            'project': 'test_project',
231
            'url': 'url',
232
            'fetch': {
233
                'data': 'abc',
234
            },
235
            'process': {
236
                'data': 'abc',
237
            },
238
            'schedule': {
239
                'age': 0,
240
            },
241
        })  # new task test_project:taskid url
242
        # task_queue = [ test_project:taskid ]
243
244
        time.sleep(0.5)
245
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid
246
        self.assertGreater(len(self.rpc.get_active_tasks()), 0)
247
        self.assertIsNotNone(task)
248
        self.assertEqual(task['taskid'], 'taskid')
249
        self.assertEqual(task['project'], 'test_project')
250
        self.assertIn('schedule', task)
251
        self.assertIn('fetch', task)
252
        self.assertIn('process', task)
253
        self.assertIn('track', task)
254
        self.assertEqual(task['fetch']['data'], 'abc')
255
256
    def test_37_force_update_processing_task(self):
257
        '''
258
        processing = [ test_project:taskid ]
259
        '''
260
        self.newtask_queue.put({
261
            'taskid': 'taskid',
262
            'project': 'test_project',
263
            'url': 'url_force_update',
264
            'schedule': {
265
                'age': 10,
266
                'force_update': True,
267
            },
268
        })  # restart task test_project:taskid url_force_update
269
        time.sleep(0.2)
270
        # it should not block next
271
272
    def test_40_taskdone_error_no_project(self):
273
        '''
274
        processing = [ test_project:taskid ]
275
        '''
276
        self.status_queue.put({
277
            'taskid': 'taskid',
278
            'project': 'no_project',
279
            'url': 'url'
280
        })  # unknown project: no_project
281
        time.sleep(0.1)
282
        self.assertEqual(self.rpc.size(), 1)
283
284
    def test_50_taskdone_error_no_track(self):
285
        '''
286
        processing = [ test_project:taskid ]
287
        '''
288
        self.status_queue.put({
289
            'taskid': 'taskid',
290
            'project': 'test_project',
291
            'url': 'url'
292
        })  # Bad status pack: 'track'
293
        time.sleep(0.1)
294
        self.assertEqual(self.rpc.size(), 1)
295
        self.status_queue.put({
296
            'taskid': 'taskid',
297
            'project': 'test_project',
298
            'url': 'url',
299
            'track': {}
300
        })  # Bad status pack: 'process'
301
        time.sleep(0.1)
302
        self.assertEqual(self.rpc.size(), 1)
303
304
    def test_60_taskdone_failed_retry(self):
305
        '''
306
        processing = [ test_project:taskid ]
307
        '''
308
        self.status_queue.put({
309
            'taskid': 'taskid',
310
            'project': 'test_project',
311
            'url': 'url',
312
            'track': {
313
                'fetch': {
314
                    'ok': True
315
                },
316
                'process': {
317
                    'ok': False
318
                },
319
            }
320
        })  # task retry 0/3 test_project:taskid url
321
        from six.moves import queue as Queue
322
        with self.assertRaises(Queue.Empty):
323
            task = self.scheduler2fetcher.get(timeout=4)
324
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
325
        self.assertIsNotNone(task)
326
327
    def test_70_taskdone_ok(self):
328
        '''
329
        processing = [ test_project:taskid ]
330
        '''
331
        self.status_queue.put({
332
            'taskid': 'taskid',
333
            'project': 'test_project',
334
            'url': 'url',
335
            'track': {
336
                'fetch': {
337
                    'ok': True
338
                },
339
                'process': {
340
                    'ok': True
341
                },
342
            }
343
        })  # task done test_project:taskid url
344
        time.sleep(0.2)
345
        self.assertEqual(self.rpc.size(), 0)
346
347
    def test_75_on_finished_msg(self):
348
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:on_finished data:,on_finished
349
350
        self.assertEqual(task['taskid'], 'on_finished')
351
352
    def test_80_newtask_age_ignore(self):
353
        '''
354
        processing = [ ]
355
        '''
356
        self.newtask_queue.put({
357
            'taskid': 'taskid',
358
            'project': 'test_project',
359
            'url': 'url',
360
            'fetch': {
361
                'data': 'abc',
362
            },
363
            'process': {
364
                'data': 'abc',
365
            },
366
            'schedule': {
367
                'age': 30,
368
            },
369
        })
370
        time.sleep(0.1)
371
        self.assertEqual(self.rpc.size(), 0)
372
373
    def test_82_newtask_via_rpc(self):
374
        '''
375
        processing = [ ]
376
        '''
377
        self.rpc.newtask({
378
            'taskid': 'taskid',
379
            'project': 'test_project',
380
            'url': 'url',
381
            'fetch': {
382
                'data': 'abc',
383
            },
384
            'process': {
385
                'data': 'abc',
386
            },
387
            'schedule': {
388
                'age': 30,
389
            },
390
        })
391
        time.sleep(0.1)
392
        self.assertEqual(self.rpc.size(), 0)
393
394
    def test_90_newtask_with_itag(self):
395
        '''
396
        task_queue = [ ]
397
        processing = [ ]
398
        '''
399
        time.sleep(0.1)
400
        self.newtask_queue.put({
401
            'taskid': 'taskid',
402
            'project': 'test_project',
403
            'url': 'url',
404
            'fetch': {
405
                'data': 'abc',
406
            },
407
            'process': {
408
                'data': 'abc',
409
            },
410
            'schedule': {
411
                'itag': "abc",
412
                'retries': 1
413
            },
414
        })  # restart task test_project:taskid url
415
416
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
417
        self.assertIsNotNone(task)
418
        self.assertEqual(task['taskid'], 'taskid')
419
420
        self.test_70_taskdone_ok()  # task done test_project:taskid url
421
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
422
423
    def test_a10_newtask_restart_by_age(self):
424
        self.newtask_queue.put({
425
            'taskid': 'taskid',
426
            'project': 'test_project',
427
            'url': 'url',
428
            'fetch': {
429
                'data': 'abc',
430
            },
431
            'process': {
432
                'data': 'abc',
433
            },
434
            'schedule': {
435
                'age': 0,
436
                'retries': 1
437
            },
438
        })  # restart task test_project:taskid url
439
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
440
        self.assertIsNotNone(task)
441
        self.assertEqual(task['taskid'], 'taskid')
442
443
    def test_a20_failed_retry(self):
444
        '''
445
        processing: [ test_project:taskid ]
446
        '''
447
        self.status_queue.put({
448
            'taskid': 'taskid',
449
            'project': 'test_project',
450
            'url': 'url',
451
            'track': {
452
                'fetch': {
453
                    'ok': True
454
                },
455
                'process': {
456
                    'ok': False
457
                },
458
            }
459
        })  # task retry 0/1 test_project:taskid url
460
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
461
        self.assertIsNotNone(task)
462
        self.assertEqual(task['taskid'], 'taskid')
463
464
        self.status_queue.put({
465
            'taskid': 'taskid',
466
            'project': 'test_project',
467
            'url': 'url',
468
            'track': {
469
                'fetch': {
470
                    'ok': False
471
                },
472
                'process': {
473
                    'ok': False
474
                },
475
            }
476
        })  # task failed test_project:taskid url
477
478
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
479
480
        from six.moves import queue as Queue
481
        with self.assertRaises(Queue.Empty):
482
            self.scheduler2fetcher.get(timeout=5)
483
484
    def test_a30_task_verify(self):
485
        self.assertFalse(self.rpc.newtask({
486
            #'taskid': 'taskid#',
487
            'project': 'test_project',
488
            'url': 'url',
489
        }))  # taskid not in task: {'project': 'test_project', 'url': 'url'}
490
        self.assertFalse(self.rpc.newtask({
491
            'taskid': 'taskid#',
492
            #'project': 'test_project',
493
            'url': 'url',
494
        }))  # project not in task: {'url': 'url', 'taskid': 'taskid#'}
495
        self.assertFalse(self.rpc.newtask({
496
            'taskid': 'taskid#',
497
            'project': 'test_project',
498
            #'url': 'url',
499
        }))  # url not in task: {'project': 'test_project', 'taskid': 'taskid#'}
500
        self.assertFalse(self.rpc.newtask({
501
            'taskid': 'taskid#',
502
            'project': 'not_exist_project',
503
            'url': 'url',
504
        }))  # unknown project: not_exist_project
505
        self.assertTrue(self.rpc.newtask({
506
            'taskid': 'taskid#',
507
            'project': 'test_project',
508
            'url': 'url',
509
        }))  # new task test_project:taskid# url
510
511
    def test_a40_success_recrawl(self):
512
        '''
513
        task_queue = [ test_project:taskid# ]
514
        '''
515
        self.newtask_queue.put({
516
            'taskid': 'taskid',
517
            'project': 'test_project',
518
            'url': 'url',
519
            'fetch': {
520
                'data': 'abc',
521
            },
522
            'process': {
523
                'data': 'abc',
524
            },
525
            'schedule': {
526
                'age': 0,
527
                'retries': 1,
528
                'auto_recrawl': True,
529
            },
530
        })  # restart task test_project:taskid url
531
        task1 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid# url
532
        task2 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
533
        self.assertIsNotNone(task1)
534
        self.assertIsNotNone(task2)
535
        self.assertTrue(task1['taskid'] == 'taskid#' or task2['taskid'] == 'taskid#')
536
537
        self.status_queue.put({
538
            'taskid': 'taskid',
539
            'project': 'test_project',
540
            'url': 'url',
541
            'schedule': {
542
                'age': 0,
543
                'retries': 1,
544
                'auto_recrawl': True,
545
            },
546
            'track': {
547
                'fetch': {
548
                    'ok': True
549
                },
550
                'process': {
551
                    'ok': True
552
                },
553
            }
554
        })  # task done test_project:taskid url
555
        task = self.scheduler2fetcher.get(timeout=10)
556
        self.assertIsNotNone(task)
557
558
    def test_a50_failed_recrawl(self):
559
        '''
560
        time_queue = [ test_project:taskid ]
561
        scheduler2fetcher = [ test_project:taskid# ]
562
        processing = [ test_project:taskid# ]
563
        '''
564
        for i in range(3):
565
            self.status_queue.put({
566
                'taskid': 'taskid',
567
                'project': 'test_project',
568
                'url': 'url',
569
                'schedule': {
570
                    'age': 0,
571
                    'retries': 1,
572
                    'auto_recrawl': True,
573
                },
574
                'track': {
575
                    'fetch': {
576
                        'ok': True
577
                    },
578
                    'process': {
579
                        'ok': False
580
                    },
581
                }
582
            })
583
            # not processing pack: test_project:taskid url
584
            # select test_project:taskid url
585
            # task retry 0/1 test_project:taskid url
586
            # select test_project:taskid url
587
            # task retry 0/1 test_project:taskid url
588
            # select test_project:taskid url
589
            task = self.scheduler2fetcher.get(timeout=10)
590
            self.assertIsNotNone(task)
591
            self.assertEqual(task['taskid'], 'taskid')
592
593
    def test_a60_disable_recrawl(self):
594
        '''
595
        time_queue = [ test_project:taskid ]
596
        scheduler2fetcher = [ test_project:taskid# ]
597
        processing = [ test_project:taskid# ]
598
        '''
599
        self.status_queue.put({
600
            'taskid': 'taskid',
601
            'project': 'test_project',
602
            'url': 'url',
603
            'schedule': {
604
                'age': 0,
605
                'retries': 1,
606
            },
607
            'track': {
608
                'fetch': {
609
                    'ok': True
610
                },
611
                'process': {
612
                    'ok': True
613
                },
614
            }
615
        })  # task done test_project:taskid url
616
617
        from six.moves import queue as Queue
618
        with self.assertRaises(Queue.Empty):
619
            self.scheduler2fetcher.get(timeout=5)
620
621
    def test_x10_inqueue_limit(self):
622
        self.projectdb.insert('test_inqueue_project', {
623
            'name': 'test_inqueue_project',
624
            'group': 'group',
625
            'status': 'DEBUG',
626
            'script': 'import time\nprint(time.time())',
627
            'comments': 'test project',
628
            'rate': 0,
629
            'burst': 0,
630
        })
631
        time.sleep(0.1)
632
        pre_size = self.rpc.size()
633
        for i in range(20):
634
            self.newtask_queue.put({
635
                'taskid': 'taskid%d' % i,
636
                'project': 'test_inqueue_project',
637
                'url': 'url',
638
                'schedule': {
639
                    'age': 3000,
640
                    'force_update': True,
641
                },
642
            })
643
        time.sleep(1)
644
        self.assertEqual(self.rpc.size() - pre_size, 10)
645
646
    def test_x20_delete_project(self):
647
        self.assertIsNotNone(self.projectdb.get('test_inqueue_project'))
648
        #self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
649
        self.projectdb.update('test_inqueue_project', status="STOP", group="lock,delete")
650
        time.sleep(1)
651
        self.assertIsNone(self.projectdb.get('test_inqueue_project'))
652
        self.taskdb._list_project()
653
        self.assertIsNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
654
        self.assertNotIn('test_inqueue_project', self.rpc.counter('5m', 'sum'))
655
656
    def test_z10_startup(self):
657
        self.assertTrue(self.process.is_alive())
658
659
    def test_z20_quit(self):
660
        self.rpc._quit()
661
        time.sleep(0.2)
662
        self.assertFalse(self.process.is_alive())
663
        self.assertEqual(
664
            self.taskdb.get_task('test_project', 'taskid')['status'],
665
            self.taskdb.SUCCESS
666
        )
667
668
if __name__ == '__main__':
669
    unittest.main()
670