Code Duplication    Length = 20-21 lines in 2 locations

tests/test_manhole.py 2 locations

@@ 338-358 (lines=21) @@
335
    with TestProcess(sys.executable, '-u', HELPER, 'test_with_fork') as proc:
336
        with dump_on_error(proc.read):
337
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
338
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
339
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
340
            for _ in range(2):
341
                proc.reset()
342
                assert_manhole_running(proc, uds_path)
343
344
            proc.reset()
345
            wait_for_strings(proc.read, TIMEOUT, 'Fork detected')
346
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
347
            new_uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
348
            assert uds_path != new_uds_path
349
350
            wait_for_strings(proc.read, TIMEOUT, 'Waiting for new connection')
351
            for _ in range(2):
352
                proc.reset()
353
                assert_manhole_running(proc, new_uds_path)
354
355
356
@mark.skipif(hasattr(sys, 'pypy_version_info'), reason="pypy doesn't support forkpty")
357
def test_with_forkpty():
358
    with TestProcess(sys.executable, '-u', HELPER, 'test_with_forkpty') as proc:
359
        with dump_on_error(proc.read):
360
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
361
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]
@@ 316-335 (lines=20) @@
313
            with TestSocket(sock) as client:
314
                with dump_on_error(client.read):
315
                    wait_for_strings(client.read, TIMEOUT, "ThreadID", "ProcessID", ">>>")
316
                    sock.send(b"print('FOOBAR')\n")
317
                    wait_for_strings(client.read, TIMEOUT, "FOOBAR")
318
319
                    wait_for_strings(proc.read, TIMEOUT, 'UID:%s' % os.getuid())
320
                    sock.shutdown(socket.SHUT_WR)
321
                    select.select([sock], [], [], 5)
322
                    sock.recv(1024)
323
                    try:
324
                        sock.shutdown(socket.SHUT_RD)
325
                    except Exception as exc:
326
                        print("Failed to SHUT_RD: %s" % exc)
327
                    try:
328
                        sock.close()
329
                    except Exception as exc:
330
                        print("Failed to close socket: %s" % exc)
331
            wait_for_strings(proc.read, TIMEOUT, 'DONE.', 'Cleaned up.', 'Waiting for new connection')
332
333
334
def test_with_fork():
335
    with TestProcess(sys.executable, '-u', HELPER, 'test_with_fork') as proc:
336
        with dump_on_error(proc.read):
337
            wait_for_strings(proc.read, TIMEOUT, '/tmp/manhole-')
338
            uds_path = re.findall(r"(/tmp/manhole-\d+)", proc.read())[0]