Completed
Push — master ( 663472...7037a7 )
by Roy
28s
created

TestTimeQueue   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 66
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 66
rs 10
wmc 6

1 Method

Rating   Name   Duplication   Size   Complexity  
B test_time_queue() 0 63 6
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import time
5
import unittest
6
7
import six
8
from six.moves import queue as Queue
9
10
from pyspider.scheduler.task_queue import InQueueTask, TaskQueue
11
12
13
class TestTaskQueue(unittest.TestCase):
14
    """
15
        TestTaskQueue
16
    """
17
18
    def test_task_queue_in_time_order(self):
19
        tq = TaskQueue(rate=300, burst=1000)
20
21
        queues = dict()
22
        tasks = dict()
23
24
        for i in range(0, 100):
25
            it = InQueueTask(str(i), priority=int(i // 10), exetime=0)
26
            tq.put(it.taskid, it.priority, it.exetime)
27
28
            if it.priority not in queues:
29
                queues[it.priority] = Queue.Queue()
30
31
            q = queues[it.priority]  # type:Queue.Queue
32
            q.put(it)
33
            tasks[it.taskid] = it
34
            six.print_('put, taskid=', it.taskid, 'priority=', it.priority, 'exetime=', it.exetime)
35
        for i in range(0, 100):
36
            task_id = tq.get()
37
            task = tasks[task_id]
38
            q = queues[task.priority]  # type: Queue.Queue
39
            expect_task = q.get()
40
            self.assertEqual(task_id, expect_task.taskid)
41
            self.assertEqual(task.priority, int(9 - i // 10))
42
            six.print_('get, taskid=', task.taskid, 'priority=', task.priority, 'exetime=', task.exetime)
43
44
        self.assertEqual(tq.size(), 100)
45
        self.assertEqual(tq.priority_queue.qsize(), 0)
46
        self.assertEqual(tq.processing.qsize(), 100)
47
        for q in six.itervalues(queues):  # type:Queue.Queue
48
            self.assertEqual(q.qsize(), 0)
49
        pass
50
51
    pass
52
53
54
class TestTimeQueue(unittest.TestCase):
55
    def test_time_queue(self):
56
57
        six.print_('Test time queue order by time only')
58
59
        tq = TaskQueue(rate=300, burst=1000)
60
61
        fifo_queue = Queue.Queue()
62
63
        interval = 5.0 / 1000
64
65
        for i in range(0, 20):
66
            it = InQueueTask(str(i), priority=int(i // 10), exetime=time.time() + (i + 1) * interval)
67
            tq.put(it.taskid, it.priority, it.exetime)
68
            fifo_queue.put(it)
69
            six.print_('put, taskid=', it.taskid, 'priority=', it.priority, 'exetime=', it.exetime)
70
71
        self.assertEqual(tq.priority_queue.qsize(), 0)
72
        self.assertEqual(tq.processing.qsize(), 0)
73
        self.assertEqual(tq.time_queue.qsize(), 20)
74
75
        for i in range(0, 20):
76
            t1 = fifo_queue.get()
77
            t2 = tq.time_queue.get()
78
            self.assertEqual(t1.taskid, t2.taskid)
79
            six.print_('get, taskid=', t2.taskid, 'priority=', t2.priority, 'exetime=', t2.exetime)
80
        self.assertEqual(tq.priority_queue.qsize(), 0)
81
        self.assertEqual(tq.processing.qsize(), 0)
82
        self.assertEqual(tq.time_queue.qsize(), 0)
83
84
        queues = dict()
85
        tasks = dict()
86
        for i in range(0, 20):
87
            priority = int(i // 10)
88
            it = InQueueTask(str(i), priority=priority, exetime=time.time() + (i + 1) * interval)
89
            tq.put(it.taskid, it.priority, it.exetime)
90
            tasks[it.taskid] = it
91
92
            if priority not in queues:
93
                queues[priority] = Queue.Queue()
94
            q = queues[priority]
95
            q.put(it)
96
            pass
97
98
        self.assertEqual(tq.priority_queue.qsize(), 0)
99
        self.assertEqual(tq.processing.qsize(), 0)
100
        self.assertEqual(tq.time_queue.qsize(), 20)
101
102
        time.sleep(20 * interval)
103
        tq.check_update()
104
        self.assertEqual(tq.priority_queue.qsize(), 20)
105
        self.assertEqual(tq.processing.qsize(), 0)
106
        self.assertEqual(tq.time_queue.qsize(), 0)
107
        for i in range(0, 20):
108
            taskid = tq.get()
109
            t1 = tasks[taskid]
110
            t2 = queues[t1.priority].get()
111
            self.assertEqual(t1.taskid, t2.taskid)
112
113
        self.assertEqual(tq.priority_queue.qsize(), 0)
114
        self.assertEqual(tq.processing.qsize(), 20)
115
        self.assertEqual(tq.time_queue.qsize(), 0)
116
117
        pass
118
119
    pass
120
121
122
if __name__ == '__main__':
123
    unittest.main()
124