1
|
|
|
import os |
2
|
|
|
import sys |
3
|
|
|
import argparse |
4
|
|
|
import time as t |
5
|
|
|
from os.path import join as pjoin |
6
|
|
|
from textwrap import dedent |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
import smartdispatch |
10
|
|
|
from command_manager import CommandManager |
11
|
|
|
import subprocess |
12
|
|
|
from queue import Queue |
13
|
|
|
from job_generator import job_generator_factory |
14
|
|
|
from smartdispatch import get_available_queues |
15
|
|
|
from smartdispatch import launch_jobs |
16
|
|
|
from smartdispatch import utils |
17
|
|
|
|
18
|
|
|
import logging |
19
|
|
|
import smartdispatch |
20
|
|
|
|
21
|
|
|
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS" |
22
|
|
|
CLUSTER_NAME = utils.detect_cluster() |
23
|
|
|
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME) |
24
|
|
|
LAUNCHER = utils.get_launcher(CLUSTER_NAME) |
25
|
|
|
|
26
|
|
|
# Autoresume settings. |
27
|
|
|
TIMEOUT_EXIT_CODE = 124 |
28
|
|
|
AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))' # By default, 60s before the maximum walltime. |
29
|
|
|
AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER) |
30
|
|
|
AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"' |
31
|
|
|
AUTORESUME_PROLOG = 'WORKER_PIDS=""' |
32
|
|
|
AUTORESUME_EPILOG = """\ |
33
|
|
|
NEED_TO_RESUME=false |
34
|
|
|
for WORKER_PID in $WORKER_PIDS; do |
35
|
|
|
wait "$WORKER_PID" |
36
|
|
|
RETURN_CODE=$? |
37
|
|
|
if [ $RETURN_CODE -eq {timeout_exit_code} ]; then |
38
|
|
|
NEED_TO_RESUME=true |
39
|
|
|
fi |
40
|
|
|
done |
41
|
|
|
if [ "$NEED_TO_RESUME" = true ]; then |
42
|
|
|
echo "Autoresuming using: {{launcher}} $PBS_FILENAME" |
43
|
|
|
sd-launch-pbs --launcher {{launcher}} $PBS_FILENAME {{path_job}} |
44
|
|
|
fi |
45
|
|
|
""".format(timeout_exit_code=TIMEOUT_EXIT_CODE) |
46
|
|
|
|
47
|
|
|
|
48
|
|
|
def main(argv=None): |
49
|
|
|
# Necessary if we want 'logging.info' to appear in stderr. |
50
|
|
|
logging.root.setLevel(logging.INFO) |
51
|
|
|
|
52
|
|
|
args = parse_arguments(argv) |
53
|
|
|
path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME) |
54
|
|
|
|
55
|
|
|
# Check if RESUME or LAUNCH mode |
56
|
|
|
if args.mode == "launch": |
57
|
|
|
if args.commandsFile is not None: |
58
|
|
|
# Commands are listed in a file. |
59
|
|
|
jobname = smartdispatch.generate_logfolder_name(os.path.basename(args.commandsFile.name), max_length=235) |
60
|
|
|
commands = smartdispatch.get_commands_from_file(args.commandsFile) |
61
|
|
|
else: |
62
|
|
|
# Command that needs to be parsed and unfolded. |
63
|
|
|
command = " ".join(args.commandAndOptions) |
64
|
|
|
jobname = smartdispatch.generate_name_from_command(command, max_length=235) |
65
|
|
|
commands = smartdispatch.unfold_command(command) |
66
|
|
|
|
67
|
|
|
commands = smartdispatch.replace_uid_tag(commands) |
68
|
|
|
nb_commands = len(commands) # For print at the end |
69
|
|
|
|
70
|
|
|
if args.batchName: |
71
|
|
|
jobname = smartdispatch.generate_logfolder_name(utils.slugify(args.batchName), max_length=235) |
72
|
|
|
|
73
|
|
|
elif args.mode == "resume": |
74
|
|
|
jobname = args.batch_uid |
75
|
|
|
if os.path.isdir(jobname): |
76
|
|
|
# We assume `jobname` is `path_job` repo, we extract the real `jobname`. |
77
|
|
|
jobname = os.path.basename(os.path.abspath(jobname)) |
78
|
|
|
|
79
|
|
|
if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)): |
80
|
|
|
raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname)) |
81
|
|
|
else: |
82
|
|
|
raise ValueError("Unknown subcommand!") |
83
|
|
|
|
84
|
|
|
job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname) |
85
|
|
|
path_job, path_job_logs, path_job_commands = job_folders_paths |
86
|
|
|
|
87
|
|
|
# Keep a log of the command line in the job folder. |
88
|
|
|
command_line = " ".join(sys.argv) |
89
|
|
|
smartdispatch.log_command_line(path_job, command_line) |
90
|
|
|
|
91
|
|
|
command_manager = CommandManager(pjoin(path_job_commands, "commands.txt")) |
92
|
|
|
|
93
|
|
|
# If resume mode, reset running jobs |
94
|
|
|
if args.mode == "launch": |
95
|
|
|
command_manager.set_commands_to_run(commands) |
96
|
|
|
elif args.mode == "resume": |
97
|
|
|
# Verifying if there are failed commands |
98
|
|
|
failed_commands = command_manager.get_failed_commands() |
99
|
|
|
if len(failed_commands) > 0: |
100
|
|
|
FAILED_COMMAND_MESSAGE = dedent("""\ |
101
|
|
|
{nb_failed} command(s) are in a failed state. They won't be resumed. |
102
|
|
|
Failed commands: |
103
|
|
|
{failed_commands} |
104
|
|
|
The actual errors can be found in the log folder under: |
105
|
|
|
{failed_commands_err_file}""") |
106
|
|
|
utils.print_boxed(FAILED_COMMAND_MESSAGE.format( |
107
|
|
|
nb_failed=len(failed_commands), |
108
|
|
|
failed_commands=''.join(failed_commands), |
109
|
|
|
failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands]) |
110
|
|
|
)) |
111
|
|
|
|
112
|
|
|
if not utils.yes_no_prompt("Do you want to continue?", 'n'): |
113
|
|
|
exit() |
114
|
|
|
|
115
|
|
|
if args.expandPool is None: |
116
|
|
|
command_manager.reset_running_commands() |
117
|
|
|
|
118
|
|
|
nb_commands = command_manager.get_nb_commands_to_run() |
119
|
|
|
|
120
|
|
|
if args.expandPool is not None: |
121
|
|
|
args.pool = min(nb_commands, args.expandPool) |
122
|
|
|
|
123
|
|
|
# If no pool size is specified the number of commands is taken |
124
|
|
|
if args.pool is None: |
125
|
|
|
args.pool = command_manager.get_nb_commands_to_run() |
126
|
|
|
|
127
|
|
|
# Generating all the worker commands |
128
|
|
|
worker_script = pjoin(os.path.dirname(smartdispatch.__file__), 'workers', 'base_worker.py') |
129
|
|
|
worker_script_flags = '' |
130
|
|
|
if args.autoresume: |
131
|
|
|
worker_script_flags = '-r' |
132
|
|
|
|
133
|
|
|
worker_call_prefix = '' |
134
|
|
|
worker_call_suffix = '' |
135
|
|
|
if args.autoresume: |
136
|
|
|
worker_call_prefix = AUTORESUME_WORKER_CALL_PREFIX |
137
|
|
|
worker_call_suffix = AUTORESUME_WORKER_CALL_SUFFIX |
138
|
|
|
|
139
|
|
|
COMMAND_STRING = 'cd "{cwd}"; {worker_call_prefix}python2 {worker_script} {worker_script_flags} "{commands_file}" "{log_folder}" '\ |
140
|
|
|
'1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\ |
141
|
|
|
'2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" &'\ |
142
|
|
|
'{worker_call_suffix}' |
143
|
|
|
COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), worker_call_prefix=worker_call_prefix, worker_script=worker_script, |
144
|
|
|
worker_script_flags=worker_script_flags, commands_file=command_manager._commands_filename, |
145
|
|
|
log_folder=path_job_logs, worker_call_suffix=worker_call_suffix) |
146
|
|
|
commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)] |
147
|
|
|
|
148
|
|
|
# TODO: use args.memPerNode instead of args.memPerNode |
149
|
|
|
queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, float('inf'), args.modules) |
150
|
|
|
|
151
|
|
|
# Check that requested core number does not exceed node total |
152
|
|
|
if args.coresPerCommand > queue.nb_cores_per_node: |
153
|
|
|
sys.stderr.write("smart-dispatch: error: coresPerCommand exceeds nodes total: asked {req_cores} cores, nodes have {node_cores}\n" |
154
|
|
|
.format(req_cores=args.coresPerCommand, node_cores=queue.nb_cores_per_node)) |
155
|
|
|
sys.exit(2) |
156
|
|
|
|
157
|
|
|
# Check that requested gpu number does not exceed node total |
158
|
|
|
if args.gpusPerCommand > queue.nb_gpus_per_node: |
159
|
|
|
sys.stderr.write("smart-dispatch: error: gpusPerCommand exceeds nodes total: asked {req_gpus} gpus, nodes have {node_gpus}\n" |
160
|
|
|
.format(req_gpus=args.gpusPerCommand, node_gpus=queue.nb_gpus_per_node)) |
161
|
|
|
sys.exit(2) |
162
|
|
|
|
163
|
|
|
|
164
|
|
|
command_params = {'nb_cores_per_command': args.coresPerCommand, |
165
|
|
|
'nb_gpus_per_command': args.gpusPerCommand, |
166
|
|
|
'mem_per_command': None # args.memPerCommand |
167
|
|
|
} |
168
|
|
|
|
169
|
|
|
prolog = [] |
170
|
|
|
epilog = ['wait'] |
171
|
|
|
if args.autoresume: |
172
|
|
|
prolog = [AUTORESUME_PROLOG] |
173
|
|
|
epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)] |
174
|
|
|
|
175
|
|
|
job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job) |
176
|
|
|
|
177
|
|
|
# generating default names per each jobs in each batch |
178
|
|
|
for pbs_id, pbs in enumerate(job_generator.pbs_list): |
179
|
|
|
proper_size_name = utils.jobname_generator(jobname, pbs_id) |
180
|
|
|
pbs.add_options(N=proper_size_name) |
181
|
|
|
|
182
|
|
|
if args.pbsFlags is not None: |
183
|
|
|
job_generator.add_pbs_flags(args.pbsFlags.split(' ')) |
184
|
|
|
pbs_filenames = job_generator.write_pbs_files(path_job_commands) |
185
|
|
|
|
186
|
|
|
# Launch the jobs |
187
|
|
|
print "## {nb_commands} command(s) will be executed in {nb_jobs} job(s) ##".format(nb_commands=nb_commands, nb_jobs=len(pbs_filenames)) |
188
|
|
|
print "Batch UID:\n{batch_uid}".format(batch_uid=jobname) |
189
|
|
|
if not args.doNotLaunch: |
190
|
|
|
|
191
|
|
|
try: |
192
|
|
|
launch_jobs(LAUNCHER if args.launcher is None else args.launcher, pbs_filenames, CLUSTER_NAME, path_job) |
193
|
|
|
except subprocess.CalledProcessError as e: |
194
|
|
|
sys.stderr.write("smart-dispatch: error: The launcher wasn't able the launch the job(s) properly. Maybe the pbs file(s) generated were invalid. The following error message was returned: \n{}".format(e.output)) |
195
|
|
|
sys.exit(2) |
196
|
|
|
|
197
|
|
|
print "\nLogs, command, and jobs id related to this batch will be in:\n {smartdispatch_folder}".format(smartdispatch_folder=path_job) |
198
|
|
|
|
199
|
|
|
|
200
|
|
|
def parse_arguments(argv=None): |
201
|
|
|
|
202
|
|
|
parser = argparse.ArgumentParser() |
203
|
|
|
parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)') |
204
|
|
|
parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.') |
205
|
|
|
parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.') |
206
|
|
|
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') |
207
|
|
|
parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.') |
208
|
|
|
parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.') |
209
|
|
|
# parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).') |
210
|
|
|
|
211
|
|
|
parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1) |
212
|
|
|
parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1) |
213
|
|
|
# parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).') |
214
|
|
|
parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)') |
215
|
|
|
|
216
|
|
|
parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+') |
217
|
|
|
parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Generate all the files without launching the job.') |
218
|
|
|
parser.add_argument('-r', '--autoresume', action='store_true', help='Requeue the job when the running time hits the maximum walltime allowed on the cluster. Assumes that commands are resumable.') |
219
|
|
|
|
220
|
|
|
parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands") |
221
|
|
|
parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"') |
222
|
|
|
subparsers = parser.add_subparsers(dest="mode") |
223
|
|
|
|
224
|
|
|
launch_parser = subparsers.add_parser('launch', help="Launch jobs.") |
225
|
|
|
launch_parser.add_argument("commandAndOptions", help="Options for the commands.", nargs=argparse.REMAINDER) |
226
|
|
|
|
227
|
|
|
resume_parser = subparsers.add_parser('resume', help="Resume jobs from batch UID.") |
228
|
|
|
resume_parser.add_argument('--expandPool', type=int, nargs='?', const=sys.maxsize, help='Add workers to the given batch. Default: # pending jobs.') |
229
|
|
|
resume_parser.add_argument("batch_uid", help="Batch UID of the jobs to resume.") |
230
|
|
|
|
231
|
|
|
args = parser.parse_args(argv) |
232
|
|
|
|
233
|
|
|
# Check for invalid arguments in |
234
|
|
|
if args.mode == "launch": |
235
|
|
|
if args.commandsFile is None and len(args.commandAndOptions) < 1: |
236
|
|
|
parser.error("You need to specify a command to launch.") |
237
|
|
|
if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None): |
238
|
|
|
parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.") |
239
|
|
|
if args.coresPerCommand < 1: |
240
|
|
|
parser.error("coresPerNode must be at least 1") |
241
|
|
|
|
242
|
|
|
return args |
243
|
|
|
|
244
|
|
|
|
245
|
|
|
if __name__ == "__main__": |
246
|
|
|
main() |
247
|
|
|
|