TestScheduler.test_60_taskdone_failed_retry()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
c 1
b 0
f 1
dl 0
loc 22
rs 9.2
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-08 22:37:13
7
8
import os
9
import time
10
import shutil
11
import unittest2 as unittest
12
import logging
13
import logging.config
14
logging.config.fileConfig("pyspider/logging.conf")
15
16
from pyspider.scheduler.task_queue import TaskQueue
17
from pyspider.libs import utils
18
19
20
class TestTaskQueue(unittest.TestCase):
21
22
    @classmethod
23
    def setUpClass(self):
24
        self.task_queue = TaskQueue()
25
        self.task_queue.rate = 100000
26
        self.task_queue.burst = 100000
27
        self.task_queue.processing_timeout = 0.5
28
29
    def test_10_put(self):
30
        self.task_queue.put('a3', 0, time.time() + 0.5)
31
        self.task_queue.put('a4', 3, time.time() + 0.2)
32
        self.task_queue.put('a2', 0)
33
        self.task_queue.put('a1', 1)
34
        self.assertEqual(self.task_queue.size(), 4)
35
36
    def test_20_update(self):
37
        self.task_queue.put('a2', 4)
38
        self.assertEqual(self.task_queue.size(), 4)
39
        self.task_queue.put('a3', 2, 0)
40
        self.assertEqual(self.task_queue.size(), 4)
41
42
    def test_30_get_from_priority_queue(self):
43
        self.assertEqual(self.task_queue.get(), 'a2')
44
        self.assertEqual(self.task_queue.size(), 4)
45
46
    def test_40_time_queue_1(self):
47
        self.task_queue.check_update()
48
        self.assertEqual(self.task_queue.get(), 'a3')
49
        self.assertEqual(self.task_queue.size(), 4)
50
51
    def test_50_time_queue_2(self):
52
        time.sleep(0.3)
53
        self.task_queue.check_update()
54
        self.assertEqual(self.task_queue.get(), 'a4')
55
        self.assertEqual(self.task_queue.get(), 'a1')
56
        self.assertEqual(self.task_queue.size(), 4)
57
58
    def test_60_processing_queue(self):
59
        time.sleep(0.5)
60
        self.task_queue.check_update()
61
        self.assertEqual(self.task_queue.get(), 'a2')
62
        self.assertEqual(len(self.task_queue), 4)
63
        self.assertEqual(self.task_queue.get(), 'a4')
64
        self.assertEqual(self.task_queue.get(), 'a3')
65
        self.assertEqual(self.task_queue.get(), 'a1')
66
        self.assertEqual(len(self.task_queue), 4)
67
68
    def test_70_done(self):
69
        self.assertTrue(self.task_queue.done('a2'))
70
        self.assertTrue(self.task_queue.done('a1'))
71
        self.assertEqual(len(self.task_queue), 2)
72
        self.assertTrue(self.task_queue.done('a4'))
73
        self.assertTrue(self.task_queue.done('a3'))
74
        self.assertEqual(len(self.task_queue), 0)
75
76
77
from pyspider.scheduler.token_bucket import Bucket
78
79
80
class TestBucket(unittest.TestCase):
81
82
    def test_bucket(self):
83
        bucket = Bucket(100, 1000)
84
        self.assertEqual(bucket.get(), 1000)
85
        time.sleep(0.1)
86
        self.assertEqual(bucket.get(), 1000)
87
        bucket.desc(100)
88
        self.assertEqual(bucket.get(), 900)
89
        time.sleep(0.1)
90
        self.assertAlmostEqual(bucket.get(), 910, delta=2)
91
        time.sleep(0.1)
92
        self.assertAlmostEqual(bucket.get(), 920, delta=2)
93
94
95
try:
96
    from six.moves import xmlrpc_client
97
except ImportError:
98
    import xmlrpclib as xmlrpc_client
99
from pyspider.scheduler.scheduler import Scheduler
100
from pyspider.database.sqlite import taskdb, projectdb, resultdb
101
from pyspider.libs.multiprocessing_queue import Queue
102
from pyspider.libs.utils import run_in_thread
103
104
105
class TestScheduler(unittest.TestCase):
106
    taskdb_path = './data/tests/task.db'
107
    projectdb_path = './data/tests/project.db'
108
    resultdb_path = './data/tests/result.db'
109
    check_project_time = 1
110
    scheduler_xmlrpc_port = 23333
111
112
    @classmethod
113
    def setUpClass(self):
114
        shutil.rmtree('./data/tests', ignore_errors=True)
115
        os.makedirs('./data/tests')
116
117
        def get_taskdb():
118
            return taskdb.TaskDB(self.taskdb_path)
119
        self.taskdb = get_taskdb()
120
121
        def get_projectdb():
122
            return projectdb.ProjectDB(self.projectdb_path)
123
        self.projectdb = get_projectdb()
124
125
        def get_resultdb():
126
            return resultdb.ResultDB(self.resultdb_path)
127
        self.resultdb = get_resultdb()
128
129
        self.newtask_queue = Queue(10)
130
        self.status_queue = Queue(10)
131
        self.scheduler2fetcher = Queue(10)
132
        self.rpc = xmlrpc_client.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port)
133
134
        def run_scheduler():
135
            scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(),
136
                                  newtask_queue=self.newtask_queue, status_queue=self.status_queue,
137
                                  out_queue=self.scheduler2fetcher, data_path="./data/tests/",
138
                                  resultdb=get_resultdb())
139
            scheduler.UPDATE_PROJECT_INTERVAL = 0.1
140
            scheduler.LOOP_INTERVAL = 0.1
141
            scheduler.INQUEUE_LIMIT = 10
142
            scheduler.DELETE_TIME = 0
143
            scheduler.DEFAULT_RETRY_DELAY = {'': 5}
144
            scheduler._last_tick = int(time.time())  # not dispatch cronjob
145
            self.xmlrpc_thread = run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
146
            scheduler.run()
147
148
        self.process = run_in_thread(run_scheduler)
149
        time.sleep(1)
150
151 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
    def tearDownClass(self):
153
        if self.process.is_alive():
154
            self.rpc._quit()
155
            self.process.join(5)
156
        self.xmlrpc_thread.join()
157
        assert not self.process.is_alive()
158
        shutil.rmtree('./data/tests', ignore_errors=True)
159
        time.sleep(1)
160
161
        assert not utils.check_port_open(5000)
162
        assert not utils.check_port_open(self.scheduler_xmlrpc_port)
163
        assert not utils.check_port_open(24444)
164
        assert not utils.check_port_open(25555)
165
166
    def test_10_new_task_ignore(self):
167
        '''
168
        task_queue = [ ]
169
        '''
170
        self.newtask_queue.put({
171
            'taskid': 'taskid',
172
            'project': 'test_project',
173
            'url': 'url'
174
        })  # unknown project: test_project
175
        self.assertEqual(self.rpc.size(), 0)
176
        self.assertEqual(len(self.rpc.get_active_tasks()), 0)
177
178
    def test_20_new_project(self):
179
        '''
180
        task_queue = [ ]
181
        '''
182
        self.projectdb.insert('test_project', {
183
            'name': 'test_project',
184
            'group': 'group',
185
            'status': 'TODO',
186
            'script': 'import time\nprint(time.time())',
187
            'comments': 'test project',
188
            'rate': 1.0,
189
            'burst': 10,
190
        })
191
192
    def test_30_update_project(self):
193
        '''
194
        task_queue = [ ]
195
        '''
196
        from six.moves import queue as Queue
197
        with self.assertRaises(Queue.Empty):
198
            task = self.scheduler2fetcher.get(timeout=1)
199
        self.projectdb.update('test_project', status="DEBUG")
200
        time.sleep(0.1)
201
        self.rpc.update_project()
202
203
        task = self.scheduler2fetcher.get(timeout=10)
204
        self.assertIsNotNone(task)
205
        self.assertEqual(task['taskid'], '_on_get_info')  # select test_project:_on_get_info data:,_on_get_info
206
207
    def test_32_get_info(self):
208
        self.status_queue.put({
209
            'taskid': '_on_get_info',
210
            'project': 'test_project',
211
            'track': {
212
                'save': {
213
                    }
214
                }
215
            })
216
        # test_project on_get_info {}
217
218
    def test_34_new_not_used_project(self):
219
        '''
220
        task_queue = []
221
        '''
222
        self.projectdb.insert('test_project_not_started', {
223
            'name': 'test_project_not_started',
224
            'group': 'group',
225
            'status': 'RUNNING',
226
            'script': 'import time\nprint(time.time())',
227
            'comments': 'test project',
228
            'rate': 1.0,
229
            'burst': 10,
230
        })
231
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project_not_started:_on_get_info data:,_on_get_info
232
        self.assertEqual(task['taskid'], '_on_get_info')
233
234
    def test_35_new_task(self):
235
        '''
236
        task_queue = [ ]
237
        '''
238
        time.sleep(0.2)
239
        self.newtask_queue.put({
240
            'taskid': 'taskid',
241
            'project': 'test_project',
242
            'url': 'url',
243
            'fetch': {
244
                'data': 'abc',
245
            },
246
            'process': {
247
                'data': 'abc',
248
            },
249
            'schedule': {
250
                'age': 0,
251
            },
252
        })  # new task test_project:taskid url
253
        # task_queue = [ test_project:taskid ]
254
255
        time.sleep(0.5)
256
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid
257
        self.assertGreater(len(self.rpc.get_active_tasks()), 0)
258
        self.assertIsNotNone(task)
259
        self.assertEqual(task['taskid'], 'taskid')
260
        self.assertEqual(task['project'], 'test_project')
261
        self.assertIn('schedule', task)
262
        self.assertIn('fetch', task)
263
        self.assertIn('process', task)
264
        self.assertIn('track', task)
265
        self.assertEqual(task['fetch']['data'], 'abc')
266
267
    def test_37_force_update_processing_task(self):
268
        '''
269
        processing = [ test_project:taskid ]
270
        '''
271
        self.newtask_queue.put({
272
            'taskid': 'taskid',
273
            'project': 'test_project',
274
            'url': 'url_force_update',
275
            'schedule': {
276
                'age': 10,
277
                'force_update': True,
278
            },
279
        })  # restart task test_project:taskid url_force_update
280
        time.sleep(0.2)
281
        # it should not block next
282
283
    def test_40_taskdone_error_no_project(self):
284
        '''
285
        processing = [ test_project:taskid ]
286
        '''
287
        self.status_queue.put({
288
            'taskid': 'taskid',
289
            'project': 'no_project',
290
            'url': 'url'
291
        })  # unknown project: no_project
292
        time.sleep(0.1)
293
        self.assertEqual(self.rpc.size(), 1)
294
295
    def test_50_taskdone_error_no_track(self):
296
        '''
297
        processing = [ test_project:taskid ]
298
        '''
299
        self.status_queue.put({
300
            'taskid': 'taskid',
301
            'project': 'test_project',
302
            'url': 'url'
303
        })  # Bad status pack: 'track'
304
        time.sleep(0.1)
305
        self.assertEqual(self.rpc.size(), 1)
306
        self.status_queue.put({
307
            'taskid': 'taskid',
308
            'project': 'test_project',
309
            'url': 'url',
310
            'track': {}
311
        })  # Bad status pack: 'process'
312
        time.sleep(0.1)
313
        self.assertEqual(self.rpc.size(), 1)
314
315
    def test_60_taskdone_failed_retry(self):
316
        '''
317
        processing = [ test_project:taskid ]
318
        '''
319
        self.status_queue.put({
320
            'taskid': 'taskid',
321
            'project': 'test_project',
322
            'url': 'url',
323
            'track': {
324
                'fetch': {
325
                    'ok': True
326
                },
327
                'process': {
328
                    'ok': False
329
                },
330
            }
331
        })  # task retry 0/3 test_project:taskid url
332
        from six.moves import queue as Queue
333
        # with self.assertRaises(Queue.Empty):
334
            # task = self.scheduler2fetcher.get(timeout=4)
335
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
336
        self.assertIsNotNone(task)
337
338
    def test_70_taskdone_ok(self):
339
        '''
340
        processing = [ test_project:taskid ]
341
        '''
342
        self.status_queue.put({
343
            'taskid': 'taskid',
344
            'project': 'test_project',
345
            'url': 'url',
346
            'track': {
347
                'fetch': {
348
                    'ok': True
349
                },
350
                'process': {
351
                    'ok': True
352
                },
353
            }
354
        })  # task done test_project:taskid url
355
        time.sleep(0.2)
356
        self.assertEqual(self.rpc.size(), 0)
357
358
    def test_75_on_finished_msg(self):
359
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:on_finished data:,on_finished
360
361
        self.assertEqual(task['taskid'], 'on_finished')
362
363
        self.status_queue.put({
364
            'taskid': 'on_finished',
365
            'project': 'test_project',
366
            'url': 'url',
367
            'track': {
368
                'fetch': {
369
                    'ok': True
370
                },
371
                'process': {
372
                    'ok': True
373
                },
374
            }
375
        })  # task done test_project:on_finished url
376
        time.sleep(0.2)
377
        self.assertEqual(self.rpc.size(), 0)
378
379
    def test_80_newtask_age_ignore(self):
380
        '''
381
        processing = [ ]
382
        '''
383
        self.newtask_queue.put({
384
            'taskid': 'taskid',
385
            'project': 'test_project',
386
            'url': 'url',
387
            'fetch': {
388
                'data': 'abc',
389
            },
390
            'process': {
391
                'data': 'abc',
392
            },
393
            'schedule': {
394
                'age': 30,
395
            },
396
        })
397
        time.sleep(0.1)
398
        self.assertEqual(self.rpc.size(), 0)
399
400
    def test_82_newtask_via_rpc(self):
401
        '''
402
        processing = [ ]
403
        '''
404
        self.rpc.newtask({
405
            'taskid': 'taskid',
406
            'project': 'test_project',
407
            'url': 'url',
408
            'fetch': {
409
                'data': 'abc',
410
            },
411
            'process': {
412
                'data': 'abc',
413
            },
414
            'schedule': {
415
                'age': 30,
416
            },
417
        })
418
        time.sleep(0.1)
419
        self.assertEqual(self.rpc.size(), 0)
420
421
    def test_90_newtask_with_itag(self):
422
        '''
423
        task_queue = [ ]
424
        processing = [ ]
425
        '''
426
        time.sleep(0.1)
427
        self.newtask_queue.put({
428
            'taskid': 'taskid',
429
            'project': 'test_project',
430
            'url': 'url',
431
            'fetch': {
432
                'data': 'abc',
433
            },
434
            'process': {
435
                'data': 'abc',
436
            },
437
            'schedule': {
438
                'itag': "abc",
439
                'retries': 1
440
            },
441
        })  # restart task test_project:taskid url
442
443
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
444
        self.assertIsNotNone(task)
445
        self.assertEqual(task['taskid'], 'taskid')
446
447
        self.test_70_taskdone_ok()  # task done test_project:taskid url
448
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
449
450
    def test_a10_newtask_restart_by_age(self):
451
        self.newtask_queue.put({
452
            'taskid': 'taskid',
453
            'project': 'test_project',
454
            'url': 'url',
455
            'fetch': {
456
                'data': 'abc',
457
            },
458
            'process': {
459
                'data': 'abc',
460
            },
461
            'schedule': {
462
                'age': 0,
463
                'retries': 1
464
            },
465
        })  # restart task test_project:taskid url
466
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
467
        self.assertIsNotNone(task)
468
        self.assertEqual(task['taskid'], 'taskid')
469
470
    def test_a20_failed_retry(self):
471
        '''
472
        processing: [ test_project:taskid ]
473
        '''
474
        self.status_queue.put({
475
            'taskid': 'taskid',
476
            'project': 'test_project',
477
            'url': 'url',
478
            'track': {
479
                'fetch': {
480
                    'ok': True
481
                },
482
                'process': {
483
                    'ok': False
484
                },
485
            }
486
        })  # task retry 0/1 test_project:taskid url
487
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
488
        self.assertIsNotNone(task)
489
        self.assertEqual(task['taskid'], 'taskid')
490
491
        self.status_queue.put({
492
            'taskid': 'taskid',
493
            'project': 'test_project',
494
            'url': 'url',
495
            'track': {
496
                'fetch': {
497
                    'ok': False
498
                },
499
                'process': {
500
                    'ok': False
501
                },
502
            }
503
        })  # task failed test_project:taskid url
504
505
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
506
507
        from six.moves import queue as Queue
508
        with self.assertRaises(Queue.Empty):
509
            self.scheduler2fetcher.get(timeout=5)
510
511
    def test_a30_task_verify(self):
512
        self.assertFalse(self.rpc.newtask({
513
            #'taskid': 'taskid#',
514
            'project': 'test_project',
515
            'url': 'url',
516
        }))  # taskid not in task: {'project': 'test_project', 'url': 'url'}
517
        self.assertFalse(self.rpc.newtask({
518
            'taskid': 'taskid#',
519
            #'project': 'test_project',
520
            'url': 'url',
521
        }))  # project not in task: {'url': 'url', 'taskid': 'taskid#'}
522
        self.assertFalse(self.rpc.newtask({
523
            'taskid': 'taskid#',
524
            'project': 'test_project',
525
            #'url': 'url',
526
        }))  # url not in task: {'project': 'test_project', 'taskid': 'taskid#'}
527
        self.assertFalse(self.rpc.newtask({
528
            'taskid': 'taskid#',
529
            'project': 'not_exist_project',
530
            'url': 'url',
531
        }))  # unknown project: not_exist_project
532
        self.assertTrue(self.rpc.newtask({
533
            'taskid': 'taskid#',
534
            'project': 'test_project',
535
            'url': 'url',
536
        }))  # new task test_project:taskid# url
537
538
    def test_a40_success_recrawl(self):
539
        '''
540
        task_queue = [ test_project:taskid# ]
541
        '''
542
        self.newtask_queue.put({
543
            'taskid': 'taskid',
544
            'project': 'test_project',
545
            'url': 'url',
546
            'fetch': {
547
                'data': 'abc',
548
            },
549
            'process': {
550
                'data': 'abc',
551
            },
552
            'schedule': {
553
                'age': 0,
554
                'retries': 1,
555
                'auto_recrawl': True,
556
            },
557
        })  # restart task test_project:taskid url
558
        task1 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid# url
559
        task2 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
560
        self.assertIsNotNone(task1)
561
        self.assertIsNotNone(task2)
562
        self.assertTrue(task1['taskid'] == 'taskid#' or task2['taskid'] == 'taskid#')
563
564
        self.status_queue.put({
565
            'taskid': 'taskid',
566
            'project': 'test_project',
567
            'url': 'url',
568
            'schedule': {
569
                'age': 0,
570
                'retries': 1,
571
                'auto_recrawl': True,
572
            },
573
            'track': {
574
                'fetch': {
575
                    'ok': True
576
                },
577
                'process': {
578
                    'ok': True
579
                },
580
            }
581
        })  # task done test_project:taskid url
582
        task = self.scheduler2fetcher.get(timeout=10)
583
        self.assertIsNotNone(task)
584
585
    def test_a50_failed_recrawl(self):
586
        '''
587
        time_queue = [ test_project:taskid ]
588
        scheduler2fetcher = [ test_project:taskid# ]
589
        processing = [ test_project:taskid# ]
590
        '''
591
        for i in range(3):
592
            self.status_queue.put({
593
                'taskid': 'taskid',
594
                'project': 'test_project',
595
                'url': 'url',
596
                'schedule': {
597
                    'age': 0,
598
                    'retries': 1,
599
                    'auto_recrawl': True,
600
                },
601
                'track': {
602
                    'fetch': {
603
                        'ok': True
604
                    },
605
                    'process': {
606
                        'ok': False
607
                    },
608
                }
609
            })
610
            # not processing pack: test_project:taskid url
611
            # select test_project:taskid url
612
            # task retry 0/1 test_project:taskid url
613
            # select test_project:taskid url
614
            # task retry 0/1 test_project:taskid url
615
            # select test_project:taskid url
616
            task = self.scheduler2fetcher.get(timeout=10)
617
            self.assertIsNotNone(task)
618
            self.assertEqual(task['taskid'], 'taskid')
619
620
    def test_a60_disable_recrawl(self):
621
        '''
622
        time_queue = [ test_project:taskid ]
623
        scheduler2fetcher = [ test_project:taskid# ]
624
        processing = [ test_project:taskid# ]
625
        '''
626
        self.status_queue.put({
627
            'taskid': 'taskid',
628
            'project': 'test_project',
629
            'url': 'url',
630
            'schedule': {
631
                'age': 0,
632
                'retries': 1,
633
            },
634
            'track': {
635
                'fetch': {
636
                    'ok': True
637
                },
638
                'process': {
639
                    'ok': True
640
                },
641
            }
642
        })  # task done test_project:taskid url
643
644
        from six.moves import queue as Queue
645
        with self.assertRaises(Queue.Empty):
646
            self.scheduler2fetcher.get(timeout=5)
647
648
    def test_38_cancel_task(self):
649
        current_size = self.rpc.size()
650
        self.newtask_queue.put({
651
            'taskid': 'taskid_to_cancel',
652
            'project': 'test_project',
653
            'url': 'url',
654
            'fetch': {
655
                'data': 'abc',
656
            },
657
            'process': {
658
                'data': 'abc',
659
            },
660
            'schedule': {
661
                'age': 0,
662
                'exetime': time.time() + 30
663
            },
664
        })  # new task test_project:taskid_to_cancel url
665
        # task_queue = [ test_project:taskid_to_cancel ]
666
667
        time.sleep(0.2)
668
        self.assertEqual(self.rpc.size(), current_size+1)
669
670
        self.newtask_queue.put({
671
            'taskid': 'taskid_to_cancel',
672
            'project': 'test_project',
673
            'url': 'url',
674
            'fetch': {
675
                'data': 'abc',
676
            },
677
            'process': {
678
                'data': 'abc',
679
            },
680
            'schedule': {
681
                'force_update': True,
682
                'age': 0,
683
                'cancel': True
684
            },
685
        })  # new cancel test_project:taskid_to_cancel url
686
        # task_queue = [ ]
687
688
        time.sleep(0.2)
689
        self.assertEqual(self.rpc.size(), current_size)
690
691
    def test_x10_inqueue_limit(self):
692
        self.projectdb.insert('test_inqueue_project', {
693
            'name': 'test_inqueue_project',
694
            'group': 'group',
695
            'status': 'DEBUG',
696
            'script': 'import time\nprint(time.time())',
697
            'comments': 'test project',
698
            'rate': 0,
699
            'burst': 0,
700
        })
701
        time.sleep(0.1)
702
        pre_size = self.rpc.size()
703
        for i in range(20):
704
            self.newtask_queue.put({
705
                'taskid': 'taskid%d' % i,
706
                'project': 'test_inqueue_project',
707
                'url': 'url',
708
                'schedule': {
709
                    'age': 3000,
710
                    'force_update': True,
711
                },
712
            })
713
        time.sleep(1)
714
        self.assertEqual(self.rpc.size() - pre_size, 10)
715
716
    def test_x20_delete_project(self):
717
        self.assertIsNotNone(self.projectdb.get('test_inqueue_project'))
718
        #self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
719
        self.projectdb.update('test_inqueue_project', status="STOP", group="lock,delete")
720
        time.sleep(1)
721
        self.assertIsNone(self.projectdb.get('test_inqueue_project'))
722
        self.taskdb._list_project()
723
        self.assertIsNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
724
        self.assertNotIn('test_inqueue_project', self.rpc.counter('5m', 'sum'))
725
726
    def test_z10_startup(self):
727
        self.assertTrue(self.process.is_alive())
728
729
    def test_z20_quit(self):
730
        self.rpc._quit()
731
        time.sleep(0.2)
732
        self.assertFalse(self.process.is_alive())
733
        self.assertEqual(
734
            self.taskdb.get_task('test_project', 'taskid')['status'],
735
            self.taskdb.SUCCESS
736
        )
737
738
739
from pyspider.scheduler.scheduler import Project
740
741
class TestProject(unittest.TestCase):
742
    task_pack = {
743
        'type': Scheduler.TASK_PACK,
744
        'taskid': 'taskid',
745
        'project': 'test_project',
746
        'url': 'url',
747
        'fetch': {
748
            'data': 'abc',
749
        },
750
        'process': {
751
            'data': 'abc',
752
        },
753
        'schedule': {
754
            'age': 0,
755
        },
756
    }
757
758
    status_ok_pack = {
759
        'taskid': 'taskid',
760
        'project': 'test_project',
761
        'url': 'url',
762
        'schedule': {
763
            'age': 0,
764
            'retries': 1,
765
        },
766
        'track': {
767
            'fetch': {
768
                'ok': True
769
            },
770
            'process': {
771
                'ok': True
772
            },
773
        }
774
    }
775
776
    status_fail_pack = {
777
        'taskid': 'taskid',
778
        'project': 'test_project',
779
        'url': 'url',
780
        'schedule': {
781
            'age': 0,
782
            'retries': 1,
783
        },
784
        'track': {
785
            'fetch': {
786
                'ok': False
787
            },
788
            'process': {
789
                'ok': False
790
            },
791
        }
792
    }
793
794
    @classmethod
795
    def setUpClass(self):
796
        self.scheduler = Scheduler(taskdb=None, projectdb=None, newtask_queue=None, status_queue=None, out_queue=None)
797
        self.scheduler.PAUSE_TIME = 2
798
        self.project = Project(self.scheduler, {
799
            'name': 'test_project_not_started',
800
            'group': 'group',
801
            'status': 'RUNNING',
802
            'script': 'import time\nprint(time.time())',
803
            'comments': 'test project',
804
            'rate': 1.0,
805
            'burst': 10,
806
            'updatetime': time.time(),
807
        })
808
809
    def test_pause_10_unpaused(self):
810
        self.assertFalse(self.project.paused)
811
812
    def test_pause_20_no_enough_fail_tasks(self):
813
        for i in range(3):
814
            self.project.active_tasks.appendleft((time.time(), dict(self.task_pack)))
815
        self.assertFalse(self.project.paused)
816
817
        for i in range(1):
818
            self.project.active_tasks.appendleft((time.time(), dict(self.status_ok_pack)))
819
        for i in range(self.scheduler.FAIL_PAUSE_NUM - 5):
820
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
821
        self.assertFalse(self.project.paused)
822
823
        for i in range(5):
824
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
825
        for i in range(1):
826
            self.project.active_tasks.appendleft((time.time(), dict(self.status_ok_pack)))
827
        self.assertFalse(self.project.paused)
828
829
        for i in range(self.scheduler.FAIL_PAUSE_NUM):
830
            self.project.active_tasks.appendleft((time.time(), dict(self.task_pack)))
831
        self.assertFalse(self.project.paused)
832
833
    def test_pause_30_paused(self):
834
        for i in range(self.scheduler.FAIL_PAUSE_NUM):
835
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
836
        for i in range(self.scheduler.FAIL_PAUSE_NUM):
837
            self.project.active_tasks.appendleft((time.time(), dict(self.task_pack)))
838
        self.assertTrue(self.project.paused)
839
840
    def test_pause_40_unpause_checking(self):
841
        time.sleep(3)
842
        self.assertFalse(self.project.paused)
843
844
    def test_pause_50_paused_again(self):
845
        for i in range(self.scheduler.UNPAUSE_CHECK_NUM):
846
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
847
        self.assertTrue(self.project.paused)
848
849
    def test_pause_60_unpause_checking(self):
850
        time.sleep(3)
851
        self.assertFalse(self.project.paused)
852
853
    def test_pause_70_unpaused(self):
854
        for i in range(1):
855
            self.project.active_tasks.appendleft((time.time(), dict(self.status_ok_pack)))
856
        for i in range(self.scheduler.UNPAUSE_CHECK_NUM):
857
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
858
        for i in range(self.scheduler.FAIL_PAUSE_NUM):
859
            self.project.active_tasks.appendleft((time.time(), dict(self.task_pack)))
860
        self.assertFalse(self.project.paused)
861
        self.assertFalse(self.project._paused)
862
863
    def test_pause_x_disable_auto_pause(self):
864
        fail_pause_num = self.scheduler.FAIL_PAUSE_NUM
865
        self.scheduler.FAIL_PAUSE_NUM = 0
866
        for i in range(100):
867
            self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
868
        self.assertFalse(self.project.paused)
869
        self.scheduler.FAIL_PAUSE_NUM = fail_pause_num
870
871
872
if __name__ == '__main__':
873
    unittest.main()
874