Completed
Push — master ( 2a4f2c...e53b42 )
by Mathieu
54s
created

scripts.get_job_folders()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 14
rs 8.5454
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import os
5
import sys
6
import argparse
7
import time as t
8
import numpy as np
9
from os.path import join as pjoin
10
from subprocess import check_output
11
from textwrap import dedent
12
13
from smartdispatch.command_manager import CommandManager
14
15
from smartdispatch.queue import Queue
16
from smartdispatch.job_generator import job_generator_factory
17
from smartdispatch import get_available_queues
18
from smartdispatch import utils
19
20
import logging
21
import smartdispatch
22
23
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
24
CLUSTER_NAME = utils.detect_cluster()
25
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
26
LAUNCHER = utils.get_launcher(CLUSTER_NAME)
27
28
29
def main():
30
    # Necessary if we want 'logging.info' to appear in stderr.
31
    logging.root.setLevel(logging.INFO)
32
33
    args = parse_arguments()
34
    path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
35
36
    # Check if RESUME or LAUNCH mode
37
    if args.mode == "launch":
38
        if args.commandsFile is not None:
39
            # Commands are listed in a file.
40
            jobname = os.path.basename(args.commandsFile.name)
41
            commands = smartdispatch.get_commands_from_file(args.commandsFile)
42
        else:
43
            # Command that needs to be parsed and unfolded.
44
            command = " ".join(args.commandAndOptions)
45
            jobname = smartdispatch.generate_name_from_command(command, max_length=235)
46
            commands = smartdispatch.unfold_command(command)
47
48
        commands = smartdispatch.replace_uid_tag(commands)
49
        nb_commands = len(commands)  # For print at the end
50
51
    elif args.mode == "resume":
52
        jobname = args.batch_uid
53
        if os.path.isdir(jobname):
54
            # We assume `jobname` is `path_job` repo, we extract the real `jobname`.
55
            jobname = os.path.basename(os.path.abspath(jobname))
56
57
        if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)):
58
            raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
59
    else:
60
        raise ValueError("Unknown subcommand!")
61
62
    job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname)
63
    path_job, path_job_logs, path_job_commands = job_folders_paths
64
65
    # Keep a log of the command line in the job folder.
66
    command_line = " ".join(sys.argv)
67
    smartdispatch.log_command_line(path_job, command_line)
68
69
    command_manager = CommandManager(pjoin(path_job_commands, "commands.txt"))
70
71
    # If resume mode, reset running jobs
72
    if args.mode == "launch":
73
        command_manager.set_commands_to_run(commands)
74
    elif args.mode == "resume":
75
        # Verifying if there are failed commands
76
        failed_commands = command_manager.get_failed_commands()
77
        if len(failed_commands) > 0:
78
            FAILED_COMMAND_MESSAGE = dedent("""\
79
            {nb_failed} command(s) are in a failed state. They won't be resumed.
80
            Failed commands:
81
            {failed_commands}
82
            The actual errors can be found in the log folder under:
83
            {failed_commands_err_file}""")
84
            utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
85
                nb_failed=len(failed_commands),
86
                failed_commands=''.join(failed_commands),
87
                failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands])
88
            ))
89
90
            if not utils.yes_no_prompt("Do you want to continue?", 'n'):
91
                exit()
92
93
        if not args.onlyPending:
94
            command_manager.reset_running_commands()
95
96
        nb_commands = command_manager.get_nb_commands_to_run()
97
98
    # If no pool size is specified the number of commands is taken
99
    if args.pool is None:
100
        args.pool = command_manager.get_nb_commands_to_run(command_line)
101
102
    # Generating all the worker commands
103
    COMMAND_STRING = 'cd "{cwd}"; smart_worker.py "{commands_file}" "{log_folder}" '\
104
                     '1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\
105
                     '2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" '
106
    COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), commands_file=command_manager._commands_filename, log_folder=path_job_logs)
107
    commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]
108
109
    # TODO: use args.memPerNode instead of args.memPerNode
110
    queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, np.inf, args.modules)
111
112
    command_params = {'nb_cores_per_command': args.coresPerCommand,
113
                      'nb_gpus_per_command': args.gpusPerCommand,
114
                      'mem_per_command': None  # args.memPerCommand
115
                      }
116
117
    job_generator = job_generator_factory(queue, commands, command_params, CLUSTER_NAME, path_job)
118
    pbs_filenames = job_generator.write_pbs_files(path_job_commands)
119
120
    # Launch the jobs
121
    print "## {nb_commands} command(s) will be executed in {nb_jobs} job(s) ##".format(nb_commands=nb_commands, nb_jobs=len(pbs_filenames))
122
    print "Batch UID:\n{batch_uid}".format(batch_uid=jobname)
123
    if not args.doNotLaunch:
124
        jobs_id = []
125
        for pbs_filename in pbs_filenames:
126
            qsub_output = check_output('{launcher} {pbs_filename}'.format(launcher=LAUNCHER if args.launcher is None else args.launcher, pbs_filename=pbs_filename), shell=True)
127
            jobs_id += [qsub_output.strip()]
128
129
        with utils.open_with_lock(pjoin(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
130
            jobs_id_file.writelines(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
131
            jobs_id_file.writelines("\n".join(jobs_id) + "\n")
132
        print "\nJobs id:\n{jobs_id}".format(jobs_id=" ".join(jobs_id))
133
    print "\nLogs, command, and jobs id related to this batch will be in:\n {smartdispatch_folder}".format(smartdispatch_folder=path_job)
134
135
136
def parse_arguments():
137
    parser = argparse.ArgumentParser()
138
    parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, qfat512@mp2)')
139
    parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.')
140
    parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
141
    parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
142
    parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
143
    # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
144
145
    parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
146
    parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
147
    # parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
148
    parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')
149
150
    parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
151
    parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Creates the QSUB files without launching them.')
152
153
    parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
154
    subparsers = parser.add_subparsers(dest="mode")
155
156
    launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
157
    launch_parser.add_argument("commandAndOptions", help="Options for the commands.", nargs=argparse.REMAINDER)
158
159
    resume_parser = subparsers.add_parser('resume', help="Resume jobs from batch UID.")
160
    resume_parser.add_argument('--onlyPending', action='store_true', help='Resume only pending commands.')
161
    resume_parser.add_argument("batch_uid", help="Batch UID of the jobs to resume.")
162
163
    args = parser.parse_args()
164
165
    # Check for invalid arguments in
166
    if args.mode == "launch":
167
        if args.commandsFile is None and len(args.commandAndOptions) < 1:
168
            parser.error("You need to specify a command to launch.")
169
        if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None):
170
            parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.")
171
172
    return args
173
174
175
if __name__ == "__main__":
176
    main()
177