|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- coding: utf-8 -*- |
|
3
|
|
|
# This script is based on the launch of platoon. |
|
4
|
|
|
|
|
5
|
|
|
|
|
6
|
|
|
from __future__ import print_function |
|
7
|
|
|
import os |
|
8
|
|
|
import shlex |
|
9
|
|
|
import argparse |
|
10
|
|
|
import subprocess |
|
11
|
|
|
import logging |
|
12
|
|
|
logging.basicConfig(level=logging.INFO) |
|
13
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
def parse_arguments(): |
|
16
|
|
|
ap = argparse.ArgumentParser(description="Launch a multi-GPU expreiment") |
|
17
|
|
|
ap.add_argument('worker_path', help='Path of worker') |
|
18
|
|
|
ap.add_argument('gpu_list', nargs='+', type=str, help='The list of Theano GPU ids (Ex: gpu0, cuda1) the script will use. 1 GPU id = 1 worker launched.') |
|
19
|
|
|
ap.add_argument("--port", type=int, default=5567) |
|
20
|
|
|
ap.add_argument("--easgd_alpha", default="auto") |
|
21
|
|
|
ap.add_argument("--log", type=str, default=None) |
|
22
|
|
|
ap.add_argument('-w', '--workers-args', required=False, help='The arguments that will be passed to your workers. (Ex: -w="learning_rate=0.1")') |
|
23
|
|
|
return ap.parse_args() |
|
24
|
|
|
|
|
25
|
|
|
|
|
26
|
|
|
def launch_process(is_server, args, device, path=""): |
|
27
|
|
|
print("Starting {0} on {1} ...".format("server" if is_server else "worker", device), end=' ') |
|
28
|
|
|
|
|
29
|
|
|
env = dict(os.environ) |
|
30
|
|
|
env['THEANO_FLAGS'] = '{},device={}'.format(env.get('THEANO_FLAGS', ''), device) |
|
31
|
|
|
if is_server: |
|
32
|
|
|
command = ["python", "-u", "-m", "deepy.multigpu.server"] |
|
33
|
|
|
else: |
|
34
|
|
|
command = ["python", "-u", path] |
|
35
|
|
|
if not args is None: |
|
36
|
|
|
command += args |
|
37
|
|
|
process = subprocess.Popen(command, bufsize=0, env=env, |
|
38
|
|
|
stdout=subprocess.PIPE if not is_server else None, |
|
39
|
|
|
stderr=subprocess.PIPE if not is_server else None) |
|
40
|
|
|
print("Done") |
|
41
|
|
|
return process |
|
42
|
|
|
|
|
43
|
|
|
if __name__ == '__main__': |
|
44
|
|
|
args = parse_arguments() |
|
45
|
|
|
|
|
46
|
|
|
process_map = {} |
|
47
|
|
|
|
|
48
|
|
|
easgd_alpha = args.easgd_alpha |
|
49
|
|
|
if easgd_alpha == "auto": |
|
50
|
|
|
easgd_alpha = 1.0 / len(args.gpu_list) |
|
51
|
|
|
|
|
52
|
|
|
controller_args_str = "--port {} --easgd_alpha {} --log {}".format( |
|
53
|
|
|
args.port, |
|
54
|
|
|
easgd_alpha, |
|
55
|
|
|
args.log |
|
56
|
|
|
) |
|
57
|
|
|
p = launch_process(True, shlex.split(controller_args_str), "cpu") |
|
58
|
|
|
process_map[p.pid] = ('scheduling server', p) |
|
59
|
|
|
|
|
60
|
|
|
for device in args.gpu_list: |
|
61
|
|
|
worker_process = launch_process(False, shlex.split(args.workers_args or ''), device, args.worker_path) |
|
62
|
|
|
process_map[worker_process.pid] = ("worker_{}".format(device), |
|
63
|
|
|
worker_process) |
|
64
|
|
|
|
|
65
|
|
|
print("\n### Waiting on experiment to finish ...") |
|
66
|
|
|
|
|
67
|
|
|
# Silly error handling but that will do for now. |
|
68
|
|
|
while process_map: |
|
69
|
|
|
pid, returncode = os.wait() |
|
70
|
|
|
if pid not in process_map: |
|
71
|
|
|
print("Recieved status for unknown process {}".format(pid)) |
|
72
|
|
|
|
|
73
|
|
|
name, p = process_map[pid] |
|
74
|
|
|
del process_map[pid] |
|
75
|
|
|
print("{} terminated with return code: {}.".format(name, returncode)) |
|
76
|
|
|
if returncode != 0: |
|
77
|
|
|
print("\nWARNING! An error has occurred.") |
|
78
|
|
|
while process_map: |
|
79
|
|
|
for name, p in list(process_map.values()): |
|
80
|
|
|
try: |
|
81
|
|
|
p.kill() |
|
82
|
|
|
except OSError: |
|
83
|
|
|
pass |
|
84
|
|
|
if p.poll() is not None: |
|
85
|
|
|
del process_map[p.pid] |
|
86
|
|
|
|