Code Duplication    Length = 18-18 lines in 2 locations

tests/test_manhole.py 2 locations

@@ 128-145 (lines=18) @@
125
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
126
            assert_manhole_running(proc, uds_path)
127
128
129
@mark.skipif(is_module_available('eventlet'), reason="evenlet can't deal with extra threads at process exit")
130
def test_daemon_connection():
131
    with TestProcess(sys.executable, HELPER, 'test_daemon_connection') as proc:
132
        with dump_on_error(proc.read):
133
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
134
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
135
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
136
137
            def terminate_and_read(client):
138
                proc.proc.send_signal(signal.SIGINT)
139
                wait_for_strings(proc.read, TIMEOUT, 'Died with KeyboardInterrupt', 'DIED.')
140
                for _ in range(5):
141
                    client.sock.send(b'bogus()\n')
142
                    time.sleep(0.05)
143
                    print(repr(client.sock.recv(1024)))
144
145
            raises((socket.error, OSError), assert_manhole_running, proc, uds_path, extra=terminate_and_read)
146
            wait_for_strings(proc.read, TIMEOUT, 'In atexit handler')
147
148
@@ 108-125 (lines=18) @@
105
                        wait_for_strings(proc.read, TIMEOUT, 'FOOBAR')
106
                        sock.send(b"tete()\n")
107
                        wait_for_strings(proc.read, TIMEOUT, 'TETE')
108
                        sock.send(b"exit()\n")
109
                        wait_for_strings(proc.read, TIMEOUT, 'Exiting exec loop.')
110
111
112
def test_install_once():
113
    with TestProcess(sys.executable, HELPER, 'test_install_once') as proc:
114
        with dump_on_error(proc.read):
115
            wait_for_strings(proc.read, TIMEOUT, 'ALREADY_INSTALLED')
116
117
118
def test_install_twice_not_strict():
119
    with TestProcess(sys.executable, HELPER, 'test_install_twice_not_strict') as proc:
120
        with dump_on_error(proc.read):
121
            wait_for_strings(proc.read, TIMEOUT,
122
                             'Not patching os.fork and os.forkpty. Oneshot activation is done by signal')
123
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
124
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
125
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
126
            assert_manhole_running(proc, uds_path)
127
128