Completed
Push — master ( d27a46...273c66 )
by Roy
01:17
created

TestScheduler.test_32_get_info()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
c 1
b 0
f 1
dl 0
loc 6
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-08 22:37:13
7
8
import os
9
import time
10
import shutil
11
import unittest2 as unittest
12
import logging
13
import logging.config
14
logging.config.fileConfig("pyspider/logging.conf")
15
16
from pyspider.scheduler.task_queue import TaskQueue
17
from pyspider.libs import utils
18
19
20
class TestTaskQueue(unittest.TestCase):
21
22
    @classmethod
23
    def setUpClass(self):
24
        self.task_queue = TaskQueue()
25
        self.task_queue.rate = 100000
26
        self.task_queue.burst = 100000
27
        self.task_queue.processing_timeout = 0.5
28
29
    def test_10_put(self):
30
        self.task_queue.put('a3', 0, time.time() + 0.5)
31
        self.task_queue.put('a4', 3, time.time() + 0.2)
32
        self.task_queue.put('a2', 0)
33
        self.task_queue.put('a1', 1)
34
        self.assertEqual(self.task_queue.size(), 4)
35
36
    def test_20_update(self):
37
        self.task_queue.put('a2', 4)
38
        self.assertEqual(self.task_queue.size(), 4)
39
        self.task_queue.put('a3', 2, 0)
40
        self.assertEqual(self.task_queue.size(), 4)
41
42
    def test_30_get_from_priority_queue(self):
43
        self.assertEqual(self.task_queue.get(), 'a2')
44
        self.assertEqual(self.task_queue.size(), 4)
45
46
    def test_40_time_queue_1(self):
47
        self.task_queue.check_update()
48
        self.assertEqual(self.task_queue.get(), 'a3')
49
        self.assertEqual(self.task_queue.size(), 4)
50
51
    def test_50_time_queue_2(self):
52
        time.sleep(0.3)
53
        self.task_queue.check_update()
54
        self.assertEqual(self.task_queue.get(), 'a4')
55
        self.assertEqual(self.task_queue.get(), 'a1')
56
        self.assertEqual(self.task_queue.size(), 4)
57
58
    def test_60_processing_queue(self):
59
        time.sleep(0.5)
60
        self.task_queue.check_update()
61
        self.assertEqual(self.task_queue.get(), 'a2')
62
        self.assertEqual(len(self.task_queue), 4)
63
        self.assertEqual(self.task_queue.get(), 'a4')
64
        self.assertEqual(self.task_queue.get(), 'a3')
65
        self.assertEqual(self.task_queue.get(), 'a1')
66
        self.assertEqual(len(self.task_queue), 4)
67
68
    def test_70_done(self):
69
        self.assertTrue(self.task_queue.done('a2'))
70
        self.assertTrue(self.task_queue.done('a1'))
71
        self.assertEqual(len(self.task_queue), 2)
72
        self.assertTrue(self.task_queue.done('a4'))
73
        self.assertTrue(self.task_queue.done('a3'))
74
        self.assertEqual(len(self.task_queue), 0)
75
76
77
from pyspider.scheduler.token_bucket import Bucket
78
79
80
class TestBucket(unittest.TestCase):
81
82
    def test_bucket(self):
83
        bucket = Bucket(100, 1000)
84
        self.assertEqual(bucket.get(), 1000)
85
        time.sleep(0.1)
86
        self.assertEqual(bucket.get(), 1000)
87
        bucket.desc(100)
88
        self.assertEqual(bucket.get(), 900)
89
        time.sleep(0.1)
90
        self.assertAlmostEqual(bucket.get(), 910, delta=2)
91
        time.sleep(0.1)
92
        self.assertAlmostEqual(bucket.get(), 920, delta=2)
93
94
95
try:
96
    from six.moves import xmlrpc_client
97
except ImportError:
98
    import xmlrpclib as xmlrpc_client
99
from pyspider.scheduler.scheduler import Scheduler
100
from pyspider.database.sqlite import taskdb, projectdb, resultdb
101
from pyspider.libs.multiprocessing_queue import Queue
102
from pyspider.libs.utils import run_in_thread
103
104
105
class TestScheduler(unittest.TestCase):
106
    taskdb_path = './data/tests/task.db'
107
    projectdb_path = './data/tests/project.db'
108
    resultdb_path = './data/tests/result.db'
109
    check_project_time = 1
110
    scheduler_xmlrpc_port = 23333
111
112
    @classmethod
113
    def setUpClass(self):
114
        shutil.rmtree('./data/tests', ignore_errors=True)
115
        os.makedirs('./data/tests')
116
117
        def get_taskdb():
118
            return taskdb.TaskDB(self.taskdb_path)
119
        self.taskdb = get_taskdb()
120
121
        def get_projectdb():
122
            return projectdb.ProjectDB(self.projectdb_path)
123
        self.projectdb = get_projectdb()
124
125
        def get_resultdb():
126
            return resultdb.ResultDB(self.resultdb_path)
127
        self.resultdb = get_resultdb()
128
129
        self.newtask_queue = Queue(10)
130
        self.status_queue = Queue(10)
131
        self.scheduler2fetcher = Queue(10)
132
        self.rpc = xmlrpc_client.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port)
133
134
        def run_scheduler():
135
            scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(),
136
                                  newtask_queue=self.newtask_queue, status_queue=self.status_queue,
137
                                  out_queue=self.scheduler2fetcher, data_path="./data/tests/",
138
                                  resultdb=get_resultdb())
139
            scheduler.UPDATE_PROJECT_INTERVAL = 0.1
140
            scheduler.LOOP_INTERVAL = 0.1
141
            scheduler.INQUEUE_LIMIT = 10
142
            scheduler.DELETE_TIME = 0
143
            scheduler.DEFAULT_RETRY_DELAY = {'': 5}
144
            scheduler._last_tick = int(time.time())  # not dispatch cronjob
145
            self.xmlrpc_thread = run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
146
            scheduler.run()
147
148
        self.process = run_in_thread(run_scheduler)
149
        time.sleep(1)
150
151 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
    def tearDownClass(self):
153
        if self.process.is_alive():
154
            self.rpc._quit()
155
            self.process.join(5)
156
        self.xmlrpc_thread.join()
157
        assert not self.process.is_alive()
158
        shutil.rmtree('./data/tests', ignore_errors=True)
159
        time.sleep(1)
160
161
        assert not utils.check_port_open(5000)
162
        assert not utils.check_port_open(self.scheduler_xmlrpc_port)
163
        assert not utils.check_port_open(24444)
164
        assert not utils.check_port_open(25555)
165
166
    def test_10_new_task_ignore(self):
167
        '''
168
        task_queue = [ ]
169
        '''
170
        self.newtask_queue.put({
171
            'taskid': 'taskid',
172
            'project': 'test_project',
173
            'url': 'url'
174
        })  # unknown project: test_project
175
        self.assertEqual(self.rpc.size(), 0)
176
        self.assertEqual(len(self.rpc.get_active_tasks()), 0)
177
178
    def test_20_new_project(self):
179
        '''
180
        task_queue = [ ]
181
        '''
182
        self.projectdb.insert('test_project', {
183
            'name': 'test_project',
184
            'group': 'group',
185
            'status': 'TODO',
186
            'script': 'import time\nprint(time.time())',
187
            'comments': 'test project',
188
            'rate': 1.0,
189
            'burst': 10,
190
        })
191
192
    def test_30_update_project(self):
193
        '''
194
        task_queue = [ ]
195
        '''
196
        from six.moves import queue as Queue
197
        with self.assertRaises(Queue.Empty):
198
            task = self.scheduler2fetcher.get(timeout=1)
199
        self.projectdb.update('test_project', status="DEBUG")
200
        time.sleep(0.1)
201
        self.rpc.update_project()
202
203
        task = self.scheduler2fetcher.get(timeout=10)
204
        self.assertIsNotNone(task)
205
        self.assertEqual(task['taskid'], '_on_get_info')  # select test_project:_on_get_info data:,_on_get_info
206
207
    def test_32_get_info(self):
208
        self.status_queue.put({
209
            'taskid': '_on_get_info',
210
            'project': 'test_project',
211
            'track': {
212
                'save': {
213
                    }
214
                }
215
            })
216
        # test_project on_get_info {}
217
218
    def test_34_new_not_used_project(self):
219
        '''
220
        task_queue = []
221
        '''
222
        self.projectdb.insert('test_project_not_started', {
223
            'name': 'test_project_not_started',
224
            'group': 'group',
225
            'status': 'RUNNING',
226
            'script': 'import time\nprint(time.time())',
227
            'comments': 'test project',
228
            'rate': 1.0,
229
            'burst': 10,
230
        })
231
        task = self.scheduler2fetcher.get(timeout=1)  # select test_project_not_started:_on_get_info data:,_on_get_info
232
        self.assertEqual(task['taskid'], '_on_get_info')
233
234
    def test_35_new_task(self):
235
        '''
236
        task_queue = [ ]
237
        '''
238
        time.sleep(0.2)
239
        self.newtask_queue.put({
240
            'taskid': 'taskid',
241
            'project': 'test_project',
242
            'url': 'url',
243
            'fetch': {
244
                'data': 'abc',
245
            },
246
            'process': {
247
                'data': 'abc',
248
            },
249
            'schedule': {
250
                'age': 0,
251
            },
252
        })  # new task test_project:taskid url
253
        # task_queue = [ test_project:taskid ]
254
255
        time.sleep(0.5)
256
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid
257
        self.assertGreater(len(self.rpc.get_active_tasks()), 0)
258
        self.assertIsNotNone(task)
259
        self.assertEqual(task['taskid'], 'taskid')
260
        self.assertEqual(task['project'], 'test_project')
261
        self.assertIn('schedule', task)
262
        self.assertIn('fetch', task)
263
        self.assertIn('process', task)
264
        self.assertIn('track', task)
265
        self.assertEqual(task['fetch']['data'], 'abc')
266
267
    def test_37_force_update_processing_task(self):
268
        '''
269
        processing = [ test_project:taskid ]
270
        '''
271
        self.newtask_queue.put({
272
            'taskid': 'taskid',
273
            'project': 'test_project',
274
            'url': 'url_force_update',
275
            'schedule': {
276
                'age': 10,
277
                'force_update': True,
278
            },
279
        })  # restart task test_project:taskid url_force_update
280
        time.sleep(0.2)
281
        # it should not block next
282
283
    def test_40_taskdone_error_no_project(self):
284
        '''
285
        processing = [ test_project:taskid ]
286
        '''
287
        self.status_queue.put({
288
            'taskid': 'taskid',
289
            'project': 'no_project',
290
            'url': 'url'
291
        })  # unknown project: no_project
292
        time.sleep(0.1)
293
        self.assertEqual(self.rpc.size(), 1)
294
295
    def test_50_taskdone_error_no_track(self):
296
        '''
297
        processing = [ test_project:taskid ]
298
        '''
299
        self.status_queue.put({
300
            'taskid': 'taskid',
301
            'project': 'test_project',
302
            'url': 'url'
303
        })  # Bad status pack: 'track'
304
        time.sleep(0.1)
305
        self.assertEqual(self.rpc.size(), 1)
306
        self.status_queue.put({
307
            'taskid': 'taskid',
308
            'project': 'test_project',
309
            'url': 'url',
310
            'track': {}
311
        })  # Bad status pack: 'process'
312
        time.sleep(0.1)
313
        self.assertEqual(self.rpc.size(), 1)
314
315
    def test_60_taskdone_failed_retry(self):
316
        '''
317
        processing = [ test_project:taskid ]
318
        '''
319
        self.status_queue.put({
320
            'taskid': 'taskid',
321
            'project': 'test_project',
322
            'url': 'url',
323
            'track': {
324
                'fetch': {
325
                    'ok': True
326
                },
327
                'process': {
328
                    'ok': False
329
                },
330
            }
331
        })  # task retry 0/3 test_project:taskid url
332
        from six.moves import queue as Queue
333
        # with self.assertRaises(Queue.Empty):
334
            # task = self.scheduler2fetcher.get(timeout=4)
335
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
336
        self.assertIsNotNone(task)
337
338
    def test_70_taskdone_ok(self):
339
        '''
340
        processing = [ test_project:taskid ]
341
        '''
342
        self.status_queue.put({
343
            'taskid': 'taskid',
344
            'project': 'test_project',
345
            'url': 'url',
346
            'track': {
347
                'fetch': {
348
                    'ok': True
349
                },
350
                'process': {
351
                    'ok': True
352
                },
353
            }
354
        })  # task done test_project:taskid url
355
        time.sleep(0.2)
356
        self.assertEqual(self.rpc.size(), 0)
357
358
    def test_75_on_finished_msg(self):
359
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:on_finished data:,on_finished
360
361
        self.assertEqual(task['taskid'], 'on_finished')
362
363
    def test_80_newtask_age_ignore(self):
364
        '''
365
        processing = [ ]
366
        '''
367
        self.newtask_queue.put({
368
            'taskid': 'taskid',
369
            'project': 'test_project',
370
            'url': 'url',
371
            'fetch': {
372
                'data': 'abc',
373
            },
374
            'process': {
375
                'data': 'abc',
376
            },
377
            'schedule': {
378
                'age': 30,
379
            },
380
        })
381
        time.sleep(0.1)
382
        self.assertEqual(self.rpc.size(), 0)
383
384
    def test_82_newtask_via_rpc(self):
385
        '''
386
        processing = [ ]
387
        '''
388
        self.rpc.newtask({
389
            'taskid': 'taskid',
390
            'project': 'test_project',
391
            'url': 'url',
392
            'fetch': {
393
                'data': 'abc',
394
            },
395
            'process': {
396
                'data': 'abc',
397
            },
398
            'schedule': {
399
                'age': 30,
400
            },
401
        })
402
        time.sleep(0.1)
403
        self.assertEqual(self.rpc.size(), 0)
404
405
    def test_90_newtask_with_itag(self):
406
        '''
407
        task_queue = [ ]
408
        processing = [ ]
409
        '''
410
        time.sleep(0.1)
411
        self.newtask_queue.put({
412
            'taskid': 'taskid',
413
            'project': 'test_project',
414
            'url': 'url',
415
            'fetch': {
416
                'data': 'abc',
417
            },
418
            'process': {
419
                'data': 'abc',
420
            },
421
            'schedule': {
422
                'itag': "abc",
423
                'retries': 1
424
            },
425
        })  # restart task test_project:taskid url
426
427
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
428
        self.assertIsNotNone(task)
429
        self.assertEqual(task['taskid'], 'taskid')
430
431
        self.test_70_taskdone_ok()  # task done test_project:taskid url
432
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
433
434
    def test_a10_newtask_restart_by_age(self):
435
        self.newtask_queue.put({
436
            'taskid': 'taskid',
437
            'project': 'test_project',
438
            'url': 'url',
439
            'fetch': {
440
                'data': 'abc',
441
            },
442
            'process': {
443
                'data': 'abc',
444
            },
445
            'schedule': {
446
                'age': 0,
447
                'retries': 1
448
            },
449
        })  # restart task test_project:taskid url
450
        task = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
451
        self.assertIsNotNone(task)
452
        self.assertEqual(task['taskid'], 'taskid')
453
454
    def test_a20_failed_retry(self):
455
        '''
456
        processing: [ test_project:taskid ]
457
        '''
458
        self.status_queue.put({
459
            'taskid': 'taskid',
460
            'project': 'test_project',
461
            'url': 'url',
462
            'track': {
463
                'fetch': {
464
                    'ok': True
465
                },
466
                'process': {
467
                    'ok': False
468
                },
469
            }
470
        })  # task retry 0/1 test_project:taskid url
471
        task = self.scheduler2fetcher.get(timeout=5)  # select test_project:taskid url
472
        self.assertIsNotNone(task)
473
        self.assertEqual(task['taskid'], 'taskid')
474
475
        self.status_queue.put({
476
            'taskid': 'taskid',
477
            'project': 'test_project',
478
            'url': 'url',
479
            'track': {
480
                'fetch': {
481
                    'ok': False
482
                },
483
                'process': {
484
                    'ok': False
485
                },
486
            }
487
        })  # task failed test_project:taskid url
488
489
        self.test_75_on_finished_msg()  # select test_project:on_finished data:,on_finished
490
491
        from six.moves import queue as Queue
492
        with self.assertRaises(Queue.Empty):
493
            self.scheduler2fetcher.get(timeout=5)
494
495
    def test_a30_task_verify(self):
496
        self.assertFalse(self.rpc.newtask({
497
            #'taskid': 'taskid#',
498
            'project': 'test_project',
499
            'url': 'url',
500
        }))  # taskid not in task: {'project': 'test_project', 'url': 'url'}
501
        self.assertFalse(self.rpc.newtask({
502
            'taskid': 'taskid#',
503
            #'project': 'test_project',
504
            'url': 'url',
505
        }))  # project not in task: {'url': 'url', 'taskid': 'taskid#'}
506
        self.assertFalse(self.rpc.newtask({
507
            'taskid': 'taskid#',
508
            'project': 'test_project',
509
            #'url': 'url',
510
        }))  # url not in task: {'project': 'test_project', 'taskid': 'taskid#'}
511
        self.assertFalse(self.rpc.newtask({
512
            'taskid': 'taskid#',
513
            'project': 'not_exist_project',
514
            'url': 'url',
515
        }))  # unknown project: not_exist_project
516
        self.assertTrue(self.rpc.newtask({
517
            'taskid': 'taskid#',
518
            'project': 'test_project',
519
            'url': 'url',
520
        }))  # new task test_project:taskid# url
521
522
    def test_a40_success_recrawl(self):
523
        '''
524
        task_queue = [ test_project:taskid# ]
525
        '''
526
        self.newtask_queue.put({
527
            'taskid': 'taskid',
528
            'project': 'test_project',
529
            'url': 'url',
530
            'fetch': {
531
                'data': 'abc',
532
            },
533
            'process': {
534
                'data': 'abc',
535
            },
536
            'schedule': {
537
                'age': 0,
538
                'retries': 1,
539
                'auto_recrawl': True,
540
            },
541
        })  # restart task test_project:taskid url
542
        task1 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid# url
543
        task2 = self.scheduler2fetcher.get(timeout=10)  # select test_project:taskid url
544
        self.assertIsNotNone(task1)
545
        self.assertIsNotNone(task2)
546
        self.assertTrue(task1['taskid'] == 'taskid#' or task2['taskid'] == 'taskid#')
547
548
        self.status_queue.put({
549
            'taskid': 'taskid',
550
            'project': 'test_project',
551
            'url': 'url',
552
            'schedule': {
553
                'age': 0,
554
                'retries': 1,
555
                'auto_recrawl': True,
556
            },
557
            'track': {
558
                'fetch': {
559
                    'ok': True
560
                },
561
                'process': {
562
                    'ok': True
563
                },
564
            }
565
        })  # task done test_project:taskid url
566
        task = self.scheduler2fetcher.get(timeout=10)
567
        self.assertIsNotNone(task)
568
569
    def test_a50_failed_recrawl(self):
570
        '''
571
        time_queue = [ test_project:taskid ]
572
        scheduler2fetcher = [ test_project:taskid# ]
573
        processing = [ test_project:taskid# ]
574
        '''
575
        for i in range(3):
576
            self.status_queue.put({
577
                'taskid': 'taskid',
578
                'project': 'test_project',
579
                'url': 'url',
580
                'schedule': {
581
                    'age': 0,
582
                    'retries': 1,
583
                    'auto_recrawl': True,
584
                },
585
                'track': {
586
                    'fetch': {
587
                        'ok': True
588
                    },
589
                    'process': {
590
                        'ok': False
591
                    },
592
                }
593
            })
594
            # not processing pack: test_project:taskid url
595
            # select test_project:taskid url
596
            # task retry 0/1 test_project:taskid url
597
            # select test_project:taskid url
598
            # task retry 0/1 test_project:taskid url
599
            # select test_project:taskid url
600
            task = self.scheduler2fetcher.get(timeout=10)
601
            self.assertIsNotNone(task)
602
            self.assertEqual(task['taskid'], 'taskid')
603
604
    def test_a60_disable_recrawl(self):
605
        '''
606
        time_queue = [ test_project:taskid ]
607
        scheduler2fetcher = [ test_project:taskid# ]
608
        processing = [ test_project:taskid# ]
609
        '''
610
        self.status_queue.put({
611
            'taskid': 'taskid',
612
            'project': 'test_project',
613
            'url': 'url',
614
            'schedule': {
615
                'age': 0,
616
                'retries': 1,
617
            },
618
            'track': {
619
                'fetch': {
620
                    'ok': True
621
                },
622
                'process': {
623
                    'ok': True
624
                },
625
            }
626
        })  # task done test_project:taskid url
627
628
        from six.moves import queue as Queue
629
        with self.assertRaises(Queue.Empty):
630
            self.scheduler2fetcher.get(timeout=5)
631
632
    def test_38_cancel_task(self):
633
        current_size = self.rpc.size()
634
        self.newtask_queue.put({
635
            'taskid': 'taskid_to_cancel',
636
            'project': 'test_project',
637
            'url': 'url',
638
            'fetch': {
639
                'data': 'abc',
640
            },
641
            'process': {
642
                'data': 'abc',
643
            },
644
            'schedule': {
645
                'age': 0,
646
                'exetime': time.time() + 30
647
            },
648
        })  # new task test_project:taskid_to_cancel url
649
        # task_queue = [ test_project:taskid_to_cancel ]
650
651
        time.sleep(0.2)
652
        self.assertEqual(self.rpc.size(), current_size+1)
653
654
        self.newtask_queue.put({
655
            'taskid': 'taskid_to_cancel',
656
            'project': 'test_project',
657
            'url': 'url',
658
            'fetch': {
659
                'data': 'abc',
660
            },
661
            'process': {
662
                'data': 'abc',
663
            },
664
            'schedule': {
665
                'force_update': True,
666
                'age': 0,
667
                'cancel': True
668
            },
669
        })  # new cancel test_project:taskid_to_cancel url
670
        # task_queue = [ ]
671
672
        time.sleep(0.2)
673
        self.assertEqual(self.rpc.size(), current_size)
674
675
    def test_x10_inqueue_limit(self):
676
        self.projectdb.insert('test_inqueue_project', {
677
            'name': 'test_inqueue_project',
678
            'group': 'group',
679
            'status': 'DEBUG',
680
            'script': 'import time\nprint(time.time())',
681
            'comments': 'test project',
682
            'rate': 0,
683
            'burst': 0,
684
        })
685
        time.sleep(0.1)
686
        pre_size = self.rpc.size()
687
        for i in range(20):
688
            self.newtask_queue.put({
689
                'taskid': 'taskid%d' % i,
690
                'project': 'test_inqueue_project',
691
                'url': 'url',
692
                'schedule': {
693
                    'age': 3000,
694
                    'force_update': True,
695
                },
696
            })
697
        time.sleep(1)
698
        self.assertEqual(self.rpc.size() - pre_size, 10)
699
700
    def test_x20_delete_project(self):
701
        self.assertIsNotNone(self.projectdb.get('test_inqueue_project'))
702
        #self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
703
        self.projectdb.update('test_inqueue_project', status="STOP", group="lock,delete")
704
        time.sleep(1)
705
        self.assertIsNone(self.projectdb.get('test_inqueue_project'))
706
        self.taskdb._list_project()
707
        self.assertIsNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
708
        self.assertNotIn('test_inqueue_project', self.rpc.counter('5m', 'sum'))
709
710
    def test_z10_startup(self):
711
        self.assertTrue(self.process.is_alive())
712
713
    def test_z20_quit(self):
714
        self.rpc._quit()
715
        time.sleep(0.2)
716
        self.assertFalse(self.process.is_alive())
717
        self.assertEqual(
718
            self.taskdb.get_task('test_project', 'taskid')['status'],
719
            self.taskdb.SUCCESS
720
        )
721
722
if __name__ == '__main__':
723
    unittest.main()
724