Code Duplication    Length = 18-18 lines in 2 locations

tests/test_manhole.py 2 locations

@@ 128-145 (lines=18) @@
125
126
@mark.skipif(is_module_available('eventlet'), reason="evenlet can't deal with extra threads at process exit")
127
def test_daemon_connection():
128
    with TestProcess(sys.executable, HELPER, 'test_daemon_connection') as proc:
129
        with dump_on_error(proc.read):
130
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
131
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
132
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
133
134
            def terminate_and_read(client):
135
                proc.proc.send_signal(signal.SIGINT)
136
                wait_for_strings(proc.read, TIMEOUT, 'Died with KeyboardInterrupt', 'DIED.')
137
                for _ in range(5):
138
                    client.sock.send(b'bogus()\n')
139
                    time.sleep(0.05)
140
                    print(repr(client.sock.recv(1024)))
141
142
            raises((socket.error, OSError), assert_manhole_running, proc, uds_path, extra=terminate_and_read)
143
            wait_for_strings(proc.read, TIMEOUT, 'In atexit handler')
144
145
146
@mark.skipif(is_module_available('eventlet'), reason="evenlet can't deal with extra threads at process exit")
147
def test_non_daemon_connection():
148
    with TestProcess(sys.executable, HELPER, 'test_simple') as proc:
@@ 108-125 (lines=18) @@
105
                        sock.send(b"print('FOOBAR')\n")
106
                        wait_for_strings(proc.read, TIMEOUT, 'FOOBAR')
107
108
109
def test_install_once():
110
    with TestProcess(sys.executable, HELPER, 'test_install_once') as proc:
111
        with dump_on_error(proc.read):
112
            wait_for_strings(proc.read, TIMEOUT, 'ALREADY_INSTALLED')
113
114
115
def test_install_twice_not_strict():
116
    with TestProcess(sys.executable, HELPER, 'test_install_twice_not_strict') as proc:
117
        with dump_on_error(proc.read):
118
            wait_for_strings(proc.read, TIMEOUT,
119
                             'Not patching os.fork and os.forkpty. Oneshot activation is done by signal')
120
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
121
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
122
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
123
            assert_manhole_running(proc, uds_path)
124
125
126
@mark.skipif(is_module_available('eventlet'), reason="evenlet can't deal with extra threads at process exit")
127
def test_daemon_connection():
128
    with TestProcess(sys.executable, HELPER, 'test_daemon_connection') as proc: