Completed
Pull Request — master (#98)
by Marc-Alexandre
55s
created

scripts.create_job_folders()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 13
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import os
5
import argparse
6
import time as t
7
import numpy as np
8
from subprocess import check_output
9
from textwrap import dedent
10
from os.path import join as pjoin
11
12
from smartdispatch.command_manager import CommandManager
13
14
from smartdispatch.queue import Queue
15
from smartdispatch.job_generator import job_generator_factory
16
from smartdispatch import get_available_queues
17
from smartdispatch import utils
18
19
import logging
20
import smartdispatch
21
22
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
23
CLUSTER_NAME = utils.detect_cluster()
24
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
25
LAUNCHER = utils.get_launcher(CLUSTER_NAME)
26
27
28
def main():
29
    # Necessary if we want 'logging.info' to appear in stderr.
30
    logging.root.setLevel(logging.INFO)
31
32
    args = parse_arguments()
33
    path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
34
35
    # Check if RESUME or LAUNCH mode
36
    if args.mode == "launch":
37
        if args.commandsFile is not None:
38
            # Commands are listed in a file.
39
            jobname = os.path.basename(args.commandsFile.name)
40
            commands = smartdispatch.get_commands_from_file(args.commandsFile)
41
        else:
42
            # Command that needs to be parsed and unfolded.
43
            command = " ".join(args.commandAndOptions)
44
            jobname = smartdispatch.generate_name_from_command(command, max_length=235)
45
            commands = smartdispatch.unfold_command(command)
46
47
        commands = smartdispatch.replace_uid_tag(commands)
48
        nb_commands = len(commands)  # For print at the end
49
50
    elif args.mode == "resume":
51
        jobname = args.batch_uid
52
        if os.path.isdir(jobname):
53
            # We assume `jobname` is `path_job` repo, we extract the real `jobname`.
54
            jobname = os.path.basename(os.path.abspath(jobname))
55
56
        if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)):
57
            raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
58
    else:
59
        raise ValueError("Unknown subcommand!")
60
61
    job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname)
62
    path_job, path_job_logs, path_job_commands = job_folders_paths
63
64
    command_manager = CommandManager(os.path.join(path_job_commands, "commands.txt"))
65
66
    # If resume mode, reset running jobs
67
    if args.mode == "launch":
68
        command_manager.set_commands_to_run(commands)
69
    elif args.mode == "resume":
70
        # Verifying if there are failed commands
71
        failed_commands = command_manager.get_failed_commands()
72
        if len(failed_commands) > 0:
73
            FAILED_COMMAND_MESSAGE = dedent("""\
74
            {nb_failed} command(s) are in a failed state. They won't be resumed.
75
            Failed commands:
76
            {failed_commands}
77
            The actual errors can be found in the log folder under:
78
            {failed_commands_err_file}""")
79
            utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
80
                nb_failed=len(failed_commands),
81
                failed_commands=''.join(failed_commands),
82
                failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands])
83
            ))
84
85
            if not utils.yes_no_prompt("Do you want to continue?", 'n'):
86
                exit()
87
88
        command_manager.reset_running_commands()
89
        nb_commands = command_manager.get_nb_commands_to_run()
90
91
    # If no pool size is specified the number of commands is taken
92
    if args.pool is None:
93
        args.pool = command_manager.get_nb_commands_to_run()
94
95
    # Generating all the worker commands
96
    COMMAND_STRING = 'cd "{cwd}"; smart_worker.py "{commands_file}" "{log_folder}" '\
97
                     '1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\
98
                     '2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" '
99
    COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), commands_file=command_manager._commands_filename, log_folder=path_job_logs)
100
    commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]
101
102
    # TODO: use args.memPerNode instead of args.memPerNode
103
    queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, np.inf, args.modules)
104
105
    command_params = {'nb_cores_per_command': args.coresPerCommand,
106
                      'nb_gpus_per_command': args.gpusPerCommand,
107
                      'mem_per_command': None  # args.memPerCommand
108
                      }
109
110
    job_generator = job_generator_factory(queue, commands, command_params, CLUSTER_NAME, path_job)
111
    pbs_filenames = job_generator.write_pbs_files(path_job_commands)
112
113
    # Launch the jobs
114
    print "## {nb_commands} command(s) will be executed in {nb_jobs} job(s) ##".format(nb_commands=nb_commands, nb_jobs=len(pbs_filenames))
115
    print "Batch UID:\n{batch_uid}".format(batch_uid=jobname)
116
    if not args.doNotLaunch:
117
        jobs_id = []
118
        for pbs_filename in pbs_filenames:
119
            qsub_output = check_output('{launcher} {pbs_filename}'.format(launcher=LAUNCHER if args.launcher is None else args.launcher, pbs_filename=pbs_filename), shell=True)
120
            jobs_id += [qsub_output.strip()]
121
122
        with utils.open_with_lock(os.path.join(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
123
            jobs_id_file.writelines(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
124
            jobs_id_file.writelines("\n".join(jobs_id) + "\n")
125
        print "\nJobs id:\n{jobs_id}".format(jobs_id=" ".join(jobs_id))
126
    print "\nLogs, command, and jobs id related to this batch will be in:\n {smartdispatch_folder}".format(smartdispatch_folder=path_job)
127
128
129
def parse_arguments():
130
    parser = argparse.ArgumentParser()
131
    parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, qfat512@mp2)')
132
    parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.')
133
    parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
134
    parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
135
    parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
136
    # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
137
138
    parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
139
    parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
140
    # parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
141
    parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')
142
143
    parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
144
    parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Creates the QSUB files without launching them.')
145
146
    parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
147
    subparsers = parser.add_subparsers(dest="mode")
148
149
    launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
150
    launch_parser.add_argument("commandAndOptions", help="Options for the commands.", nargs=argparse.REMAINDER)
151
152
    resume_parser = subparsers.add_parser('resume', help="Resume jobs from batch UID.")
153
    resume_parser.add_argument("batch_uid", help="Batch UID of the jobs to resume.")
154
155
    args = parser.parse_args()
156
157
    # Check for invalid arguments in
158
    if args.mode == "launch":
159
        if args.commandsFile is None and len(args.commandAndOptions) < 1:
160
            parser.error("You need to specify a command to launch.")
161
        if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None):
162
            parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.")
163
164
    return args
165
166
167
if __name__ == "__main__":
168
    main()
169