TestPlumdUtilsSignalWaiter   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 37
Duplicated Lines 0 %

Importance

Changes 2
Bugs 2 Features 1
Metric Value
c 2
b 2
f 1
dl 0
loc 37
rs 10
wmc 2

2 Methods

Rating   Name   Duplication   Size   Complexity  
A test_signalwaiter_normal2() 0 18 1
A test_signalwaiter_normal1() 0 14 1
1
import os
2
import sys
3
PY3 = sys.version_info > (3,)
4
5
import time
6
import signal
7
import threading
8
9
import unittest
10
11
SLEEP_TIME = 0.01
12
MAX_DIFF = 8192
13
FILE_PATH = os.path.realpath(__file__)
14
PATH = os.path.dirname(FILE_PATH)
15
sys.path.insert(0, os.path.realpath(os.path.join(PATH, "../../../")))
16
import plumd.util
17
18
19
def call_wait(swaiter):
20
    swaiter.wait()
21
22
23
class TestPlumdUtilsSignalWaiter(unittest.TestCase):
24
25
26
    def test_signalwaiter_normal1(self):
27
        """ensure signal waiter functioing normally"""
28
        # setup signal handling
29
        sig_wait = [ signal.SIGUSR1 ]
0 ignored issues
show
Coding Style introduced by
No space allowed after bracket
sig_wait = [ signal.SIGUSR1 ]
^
Loading history...
Coding Style introduced by
No space allowed before bracket
sig_wait = [ signal.SIGUSR1 ]
^
Loading history...
30
        sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
0 ignored issues
show
Coding Style introduced by
No space allowed after bracket
sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
^
Loading history...
Coding Style introduced by
No space allowed before bracket
sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
^
Loading history...
31
        swait = plumd.util.SignalWaiter(sig_wait=sig_wait,
32
                                        sig_ignore=sig_ignore)
33
        thr = threading.Thread(group=None, target=call_wait, args=(swait,))
34
        thr.start()
35
        self.assertTrue(thr.is_alive())
36
        self_pid = os.getpid()
37
        os.kill(self_pid, signal.SIGUSR1)
38
        time.sleep(SLEEP_TIME)
39
        self.assertTrue(not thr.is_alive())
40
41
42
    def test_signalwaiter_normal2(self):
43
        """ensure signal waiter functioing normally"""
44
        # setup signal handling
45
        sig_wait = [ signal.SIGUSR1 ]
0 ignored issues
show
Coding Style introduced by
No space allowed after bracket
sig_wait = [ signal.SIGUSR1 ]
^
Loading history...
Coding Style introduced by
No space allowed before bracket
sig_wait = [ signal.SIGUSR1 ]
^
Loading history...
46
        sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
0 ignored issues
show
Coding Style introduced by
No space allowed after bracket
sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
^
Loading history...
Coding Style introduced by
No space allowed before bracket
sig_ignore = [ signal.SIGUSR2, signal.SIGHUP ]
^
Loading history...
47
        swait = plumd.util.SignalWaiter(sig_wait=sig_wait,
48
                                        sig_ignore=sig_ignore)
49
        thr = threading.Thread(group=None, target=call_wait, args=(swait,))
50
        thr.start()
51
        self.assertTrue(thr.is_alive())
52
        self_pid = os.getpid()
53
        os.kill(self_pid, signal.SIGUSR2)
54
        os.kill(self_pid, signal.SIGHUP)
55
        time.sleep(SLEEP_TIME)
56
        self.assertTrue(thr.is_alive())
57
        os.kill(self_pid, signal.SIGUSR1)
58
        time.sleep(SLEEP_TIME)
59
        self.assertTrue(not thr.is_alive())
60
61
62
if __name__ == '__main__':
63
    unittest.main()
64