Completed
Push — master ( 74470a...99859d )
by Mathieu
7s
created

scripts.parse_arguments()   D

Complexity

Conditions 8

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 37
rs 4
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import os
5
import sys
6
import argparse
7
import time as t
8
import numpy as np
9
from os.path import join as pjoin
10
from subprocess import check_output
11
from textwrap import dedent
12
13
from smartdispatch.command_manager import CommandManager
14
15
from smartdispatch.queue import Queue
16
from smartdispatch.job_generator import job_generator_factory
17
from smartdispatch import get_available_queues
18
from smartdispatch import utils
19
from smartdispatch.filelock import open_with_lock
20
21
import logging
22
import smartdispatch
23
24
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
25
CLUSTER_NAME = utils.detect_cluster()
26
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
27
LAUNCHER = utils.get_launcher(CLUSTER_NAME)
28
29
30
def main():
31
    # Necessary if we want 'logging.info' to appear in stderr.
32
    logging.root.setLevel(logging.INFO)
33
34
    args = parse_arguments()
35
    path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
36
37
    # Check if RESUME or LAUNCH mode
38
    if args.mode == "launch":
39
        if args.commandsFile is not None:
40
            # Commands are listed in a file.
41
            jobname = os.path.basename(args.commandsFile.name)
42
            commands = smartdispatch.get_commands_from_file(args.commandsFile)
43
        else:
44
            # Command that needs to be parsed and unfolded.
45
            command = " ".join(args.commandAndOptions)
46
            jobname = smartdispatch.generate_name_from_command(command, max_length=235)
47
            commands = smartdispatch.unfold_command(command)
48
49
        commands = smartdispatch.replace_uid_tag(commands)
50
        nb_commands = len(commands)  # For print at the end
51
52
    elif args.mode == "resume":
53
        jobname = args.batch_uid
54
        if os.path.isdir(jobname):
55
            # We assume `jobname` is `path_job` repo, we extract the real `jobname`.
56
            jobname = os.path.basename(os.path.abspath(jobname))
57
58
        if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)):
59
            raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
60
    else:
61
        raise ValueError("Unknown subcommand!")
62
63
    job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname)
64
    path_job, path_job_logs, path_job_commands = job_folders_paths
65
66
    # Keep a log of the command line in the job folder.
67
    command_line = " ".join(sys.argv)
68
    smartdispatch.log_command_line(path_job, command_line)
69
70
    command_manager = CommandManager(pjoin(path_job_commands, "commands.txt"))
71
72
    # If resume mode, reset running jobs
73
    if args.mode == "launch":
74
        command_manager.set_commands_to_run(commands)
75
    elif args.mode == "resume":
76
        # Verifying if there are failed commands
77
        failed_commands = command_manager.get_failed_commands()
78
        if len(failed_commands) > 0:
79
            FAILED_COMMAND_MESSAGE = dedent("""\
80
            {nb_failed} command(s) are in a failed state. They won't be resumed.
81
            Failed commands:
82
            {failed_commands}
83
            The actual errors can be found in the log folder under:
84
            {failed_commands_err_file}""")
85
            utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
86
                nb_failed=len(failed_commands),
87
                failed_commands=''.join(failed_commands),
88
                failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands])
89
            ))
90
91
            if not utils.yes_no_prompt("Do you want to continue?", 'n'):
92
                exit()
93
94
        if not args.onlyPending:
95
            command_manager.reset_running_commands()
96
97
        nb_commands = command_manager.get_nb_commands_to_run()
98
99
    # If no pool size is specified the number of commands is taken
100
    if args.pool is None:
101
        args.pool = command_manager.get_nb_commands_to_run()
102
103
    # Generating all the worker commands
104
    COMMAND_STRING = 'cd "{cwd}"; smart_worker.py "{commands_file}" "{log_folder}" '\
105
                     '1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\
106
                     '2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" '
107
    COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), commands_file=command_manager._commands_filename, log_folder=path_job_logs)
108
    commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]
109
110
    # TODO: use args.memPerNode instead of args.memPerNode
111
    queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, np.inf, args.modules)
112
113
    command_params = {'nb_cores_per_command': args.coresPerCommand,
114
                      'nb_gpus_per_command': args.gpusPerCommand,
115
                      'mem_per_command': None  # args.memPerCommand
116
                      }
117
118
    job_generator = job_generator_factory(queue, commands, command_params, CLUSTER_NAME, path_job)
119
    pbs_filenames = job_generator.write_pbs_files(path_job_commands)
120
121
    # Launch the jobs
122
    print "## {nb_commands} command(s) will be executed in {nb_jobs} job(s) ##".format(nb_commands=nb_commands, nb_jobs=len(pbs_filenames))
123
    print "Batch UID:\n{batch_uid}".format(batch_uid=jobname)
124
    if not args.doNotLaunch:
125
        jobs_id = []
126
        for pbs_filename in pbs_filenames:
127
            qsub_output = check_output('{launcher} {pbs_filename}'.format(launcher=LAUNCHER if args.launcher is None else args.launcher, pbs_filename=pbs_filename), shell=True)
128
            jobs_id += [qsub_output.strip()]
129
130
        with open_with_lock(pjoin(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
131
            jobs_id_file.writelines(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
132
            jobs_id_file.writelines("\n".join(jobs_id) + "\n")
133
        print "\nJobs id:\n{jobs_id}".format(jobs_id=" ".join(jobs_id))
134
    print "\nLogs, command, and jobs id related to this batch will be in:\n {smartdispatch_folder}".format(smartdispatch_folder=path_job)
135
136
137
def parse_arguments():
138
    parser = argparse.ArgumentParser()
139
    parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, qfat512@mp2)')
140
    parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.')
141
    parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
142
    parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
143
    parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
144
    # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
145
146
    parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
147
    parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
148
    # parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
149
    parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')
150
151
    parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
152
    parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Creates the QSUB files without launching them.')
153
154
    parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
155
    subparsers = parser.add_subparsers(dest="mode")
156
157
    launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
158
    launch_parser.add_argument("commandAndOptions", help="Options for the commands.", nargs=argparse.REMAINDER)
159
160
    resume_parser = subparsers.add_parser('resume', help="Resume jobs from batch UID.")
161
    resume_parser.add_argument('--onlyPending', action='store_true', help='Resume only pending commands.')
162
    resume_parser.add_argument("batch_uid", help="Batch UID of the jobs to resume.")
163
164
    args = parser.parse_args()
165
166
    # Check for invalid arguments in
167
    if args.mode == "launch":
168
        if args.commandsFile is None and len(args.commandAndOptions) < 1:
169
            parser.error("You need to specify a command to launch.")
170
        if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None):
171
            parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.")
172
173
    return args
174
175
176
if __name__ == "__main__":
177
    main()
178