Completed
Pull Request — master (#167)
by
unknown
28s
created

parse_arguments()   F

Complexity

Conditions 9

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 9
Bugs 0 Features 1
Metric Value
cc 9
c 9
b 0
f 1
dl 0
loc 43
rs 3
1
import os
2
import sys
3
import argparse
4
import time as t
5
from os.path import join as pjoin
6
from textwrap import dedent
7
8
9
import smartdispatch
10
from command_manager import CommandManager
11
import subprocess
12
from queue import Queue
13
from job_generator import job_generator_factory
14
from smartdispatch import get_available_queues
15
from smartdispatch import launch_jobs
16
from smartdispatch import utils
17
18
import logging
19
import smartdispatch
20
21
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
22
CLUSTER_NAME = utils.detect_cluster()
23
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
24
LAUNCHER = utils.get_launcher(CLUSTER_NAME)
25
26
# Autoresume settings.
27
TIMEOUT_EXIT_CODE = 124
28
AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))'  # By default, 60s before the maximum walltime.
29
AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER)
30
AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"'
31
AUTORESUME_PROLOG = 'WORKER_PIDS=""' 
32
AUTORESUME_EPILOG = """\
33
NEED_TO_RESUME=false
34
for WORKER_PID in $WORKER_PIDS; do
35
    wait "$WORKER_PID"
36
    RETURN_CODE=$?
37
    if [ $RETURN_CODE -eq {timeout_exit_code} ]; then
38
        NEED_TO_RESUME=true
39
    fi
40
done
41
if [ "$NEED_TO_RESUME" = true ]; then
42
    echo "Autoresuming using: {{launcher}} $PBS_FILENAME"
43
    sd-launch-pbs --launcher {{launcher}} $PBS_FILENAME {{path_job}}
44
fi
45
""".format(timeout_exit_code=TIMEOUT_EXIT_CODE)
46
47
48
def main(argv=None):
49
    # Necessary if we want 'logging.info' to appear in stderr.
50
    logging.root.setLevel(logging.INFO)
51
52
    args = parse_arguments(argv)
53
    path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
54
55
    # Check if RESUME or LAUNCH mode
56
    if args.mode == "launch":
57
        if args.commandsFile is not None:
58
            # Commands are listed in a file.
59
            jobname = smartdispatch.generate_logfolder_name(os.path.basename(args.commandsFile.name), max_length=235)
60
            commands = smartdispatch.get_commands_from_file(args.commandsFile)
61
        else:
62
            # Command that needs to be parsed and unfolded.
63
            command = " ".join(args.commandAndOptions)
64
            jobname = smartdispatch.generate_name_from_command(command, max_length=235)
65
            commands = smartdispatch.unfold_command(command)
66
67
        commands = smartdispatch.replace_uid_tag(commands)
68
        nb_commands = len(commands)  # For print at the end
69
70
        if args.batchName:
71
            jobname = smartdispatch.generate_logfolder_name(utils.slugify(args.batchName), max_length=235)
72
73
    elif args.mode == "resume":
74
        jobname = args.batch_uid
75
        if os.path.isdir(jobname):
76
            # We assume `jobname` is `path_job` repo, we extract the real `jobname`.
77
            jobname = os.path.basename(os.path.abspath(jobname))
78
79
        if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)):
80
            raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
81
    else:
82
        raise ValueError("Unknown subcommand!")
83
84
    job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname)
85
    path_job, path_job_logs, path_job_commands = job_folders_paths
86
87
    # Keep a log of the command line in the job folder.
88
    command_line = " ".join(sys.argv)
89
    smartdispatch.log_command_line(path_job, command_line)
90
91
    command_manager = CommandManager(pjoin(path_job_commands, "commands.txt"))
92
93
    # If resume mode, reset running jobs
94
    if args.mode == "launch":
95
        command_manager.set_commands_to_run(commands)
96
    elif args.mode == "resume":
97
        # Verifying if there are failed commands
98
        failed_commands = command_manager.get_failed_commands()
99
        if len(failed_commands) > 0:
100
            FAILED_COMMAND_MESSAGE = dedent("""\
101
            {nb_failed} command(s) are in a failed state. They won't be resumed.
102
            Failed commands:
103
            {failed_commands}
104
            The actual errors can be found in the log folder under:
105
            {failed_commands_err_file}""")
106
            utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
107
                nb_failed=len(failed_commands),
108
                failed_commands=''.join(failed_commands),
109
                failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands])
110
            ))
111
112
            if not utils.yes_no_prompt("Do you want to continue?", 'n'):
113
                exit()
114
115
        if args.expandPool is None:
116
            command_manager.reset_running_commands()
117
118
        nb_commands = command_manager.get_nb_commands_to_run()
119
120
        if args.expandPool is not None:
121
            args.pool = min(nb_commands, args.expandPool)
122
123
    # If no pool size is specified the number of commands is taken
124
    if args.pool is None:
125
        args.pool = command_manager.get_nb_commands_to_run()
126
127
    # Generating all the worker commands
128
    worker_script = pjoin(os.path.dirname(smartdispatch.__file__), 'workers', 'base_worker.py')
129
    worker_script_flags = ''
130
    if args.autoresume:
131
        worker_script_flags = '-r'
132
133
    worker_call_prefix = ''
134
    worker_call_suffix = ''
135
    if args.autoresume:
136
        worker_call_prefix = AUTORESUME_WORKER_CALL_PREFIX
137
        worker_call_suffix = AUTORESUME_WORKER_CALL_SUFFIX
138
139
    COMMAND_STRING = 'cd "{cwd}"; {worker_call_prefix}python2 {worker_script} {worker_script_flags} "{commands_file}" "{log_folder}" '\
140
                     '1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\
141
                     '2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" &'\
142
                     '{worker_call_suffix}'
143
    COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), worker_call_prefix=worker_call_prefix, worker_script=worker_script,
144
                                           worker_script_flags=worker_script_flags, commands_file=command_manager._commands_filename,
145
                                           log_folder=path_job_logs, worker_call_suffix=worker_call_suffix)
146
    commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]
147
148
    # TODO: use args.memPerNode instead of args.memPerNode
149
    queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, float('inf'), args.modules)
150
151
    # Check that requested core number does not exceed node total
152
    if args.coresPerCommand > queue.nb_cores_per_node:
153
        sys.stderr.write("smart-dispatch: error: coresPerCommand exceeds nodes total: asked {req_cores} cores, nodes have {node_cores}\n"
154
                         .format(req_cores=args.coresPerCommand, node_cores=queue.nb_cores_per_node))
155
        sys.exit(2)
156
157
    # Check that requested gpu number does not exceed node total
158
    if args.gpusPerCommand > queue.nb_gpus_per_node:
159
        sys.stderr.write("smart-dispatch: error: gpusPerCommand exceeds nodes total: asked {req_gpus} gpus, nodes have {node_gpus}\n"
160
                         .format(req_gpus=args.gpusPerCommand, node_gpus=queue.nb_gpus_per_node))
161
        sys.exit(2)
162
163
164
    command_params = {'nb_cores_per_command': args.coresPerCommand,
165
                      'nb_gpus_per_command': args.gpusPerCommand,
166
                      'mem_per_command': None  # args.memPerCommand
167
                      }
168
169
    prolog = []
170
    epilog = ['wait']
171
    if args.autoresume:
172
        prolog = [AUTORESUME_PROLOG]
173
        epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)]
174
175
    job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job)
176
    
177
    # generating default names per each jobs in each batch
178
    for pbs_id, pbs in enumerate(job_generator.pbs_list):
179
        proper_size_name = utils.jobname_generator(jobname, pbs_id)
180
        pbs.add_options(N=proper_size_name)
181
    
182
    if args.pbsFlags is not None:
183
        job_generator.add_pbs_flags(args.pbsFlags.split(' '))
184
    pbs_filenames = job_generator.write_pbs_files(path_job_commands)
185
186
    # Launch the jobs
187
    print "## {nb_commands} command(s) will be executed in {nb_jobs} job(s) ##".format(nb_commands=nb_commands, nb_jobs=len(pbs_filenames))
188
    print "Batch UID:\n{batch_uid}".format(batch_uid=jobname)
189
    if not args.doNotLaunch:
190
        
191
        try:
192
            launch_jobs(LAUNCHER if args.launcher is None else args.launcher, pbs_filenames, CLUSTER_NAME, path_job)
193
        except subprocess.CalledProcessError as e:
194
            sys.stderr.write("smart-dispatch: error: The launcher wasn't able the launch the job(s) properly. Maybe the pbs file(s) generated were invalid. The following error message was returned: \n{}".format(e.output))
195
            sys.exit(2)
196
197
    print "\nLogs, command, and jobs id related to this batch will be in:\n {smartdispatch_folder}".format(smartdispatch_folder=path_job)
198
199
200
def parse_arguments(argv=None):
201
202
    parser = argparse.ArgumentParser()
203
    parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)')
204
    parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.')
205
    parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.')
206
    parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
207
    parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
208
    parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
209
    # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
210
211
    parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
212
    parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
213
    # parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
214
    parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')
215
216
    parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
217
    parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Generate all the files without launching the job.')
218
    parser.add_argument('-r', '--autoresume', action='store_true', help='Requeue the job when the running time hits the maximum walltime allowed on the cluster. Assumes that commands are resumable.')
219
220
    parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
221
    parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"')
222
    subparsers = parser.add_subparsers(dest="mode")
223
224
    launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
225
    launch_parser.add_argument("commandAndOptions", help="Options for the commands.", nargs=argparse.REMAINDER)
226
227
    resume_parser = subparsers.add_parser('resume', help="Resume jobs from batch UID.")
228
    resume_parser.add_argument('--expandPool', type=int, nargs='?', const=sys.maxsize, help='Add workers to the given batch. Default: # pending jobs.')
229
    resume_parser.add_argument("batch_uid", help="Batch UID of the jobs to resume.")
230
231
    args = parser.parse_args(argv)
232
233
    # Check for invalid arguments in
234
    if args.mode == "launch":
235
        if args.commandsFile is None and len(args.commandAndOptions) < 1:
236
            parser.error("You need to specify a command to launch.")
237
        if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None):
238
            parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.")
239
        if args.coresPerCommand < 1:
240
            parser.error("coresPerNode must be at least 1")
241
242
    return args
243
244
245
if __name__ == "__main__":
246
    main()
247