launch_jobs()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 6
c 1
b 0
f 1
dl 0
loc 32
rs 7.5384
1
from __future__ import absolute_import
2
3
import os
4
import re
5
import itertools
6
import time as t
7
from os.path import join as pjoin
8
from subprocess import check_output
9
10
import smartdispatch
11
from smartdispatch import utils
12
from smartdispatch.filelock import open_with_lock
13
from smartdispatch.argument_template import argument_templates
14
15
UID_TAG = "{UID}"
16
17
18
def generate_name_from_command(command, max_length_arg=None, max_length=None):
19
    ''' Generates name from a given command.
20
21
    Generate a name by replacing spaces in command with dashes and
22
    by trimming lengthty (as defined by max_length_arg) arguments.
23
24
    Parameters
25
    ----------
26
    command : str
27
        command from which to generate the name
28
    max_length_arg : int
29
        arguments longer than this will be trimmed keeping last characters (Default: inf)
30
    max_length : int
31
        trim name if longer than this keeping last characters (Default: inf)
32
33
    Returns
34
    -------
35
    name : str
36
        slugified name
37
    '''
38
    if max_length_arg is not None:
39
        max_length_arg = min(-max_length_arg, max_length_arg)
40
41
    return generate_logfolder_name('_'.join([utils.slugify(argvalue)[max_length_arg:] for argvalue in command.split()]), max_length)
42
43
44
def generate_logfolder_name(name, max_length=None):
45
    folder_name = t.strftime("%Y-%m-%d_%H-%M-%S_")
46
    folder_name += name
47
    return folder_name[:max_length]
48
49
50
def get_commands_from_file(fileobj):
51
    ''' Reads commands from `fileobj`.
52
53
    Parameters
54
    ----------
55
    fileobj : file
56
        opened file where to read commands from
57
58
    Returns
59
    -------
60
    commands : list of str
61
        commands read from the file
62
    '''
63
64
    return  [line.strip() for line in fileobj if len(line.strip()) > 0]
65
66
67
def unfold_command(command):
68
    ''' Unfolds a command into a list of unfolded commands.
69
70
    Unfolding is performed for every folded arguments (see *Arguments templates*)
71
    found in `command`. Then, resulting commands are generated using the product
72
    of every unfolded arguments.
73
74
    Parameters
75
    ----------
76
    command : list of str
77
        command to unfold
78
79
    Returns
80
    -------
81
    commands : list of str
82
        commands obtained after unfolding `command`
83
84
    Arguments template
85
    ------------------
86
    *list*: "[item1 item2 ... itemN]"
87
    *range*: "[start:end]" or "[start:end:step]"
88
    '''
89
    text = utils.encode_escaped_characters(command)
90
91
    # Build the master regex with all argument's regex
92
    regex = "(" + "|".join(["(?P<{0}>{1})".format(name, arg.regex) for name, arg in argument_templates.items()]) + ")"
93
94
    pos = 0
95
    arguments = []
96
    for match in re.finditer(regex, text):
97
        # Add already unfolded argument
98
        arguments.append([text[pos:match.start()]])
99
100
        # Unfold argument
101
        argument_template_name, matched_text = next((k, v) for k, v in match.groupdict().items() if v is not None)
102
        arguments.append(argument_templates[argument_template_name].unfold(matched_text))
103
        pos = match.end()
104
105
    arguments.append([text[pos:]])  # Add remaining unfolded arguments
106
    arguments = [map(utils.decode_escaped_characters, argvalues) for argvalues in arguments]
107
    return ["".join(argvalues) for argvalues in itertools.product(*arguments)]
108
109
110
def replace_uid_tag(commands):
111
    return [command.replace("{UID}", utils.generate_uid_from_string(command)) for command in commands]
112
113
114
def get_available_queues(cluster_name):
115
    """ Fetches all available queues on the current cluster """
116
    if cluster_name is None:
117
        return {}
118
119
    smartdispatch_dir, _ = os.path.split(smartdispatch.__file__)
120
    config_dir = pjoin(smartdispatch_dir, 'config')
121
122
    config_filename = cluster_name + ".json"
123
    config_filepath = pjoin(config_dir, config_filename)
124
125
    if not os.path.isfile(config_filepath):
126
        return {}  # Unknown cluster
127
128
    queues_infos = utils.load_dict_from_json_file(config_filepath)
129
    return queues_infos
130
131
132
def get_job_folders(path, jobname, create_if_needed=False):
133
    """ Get all folder paths for a specific job (creating them if needed). """
134
    path_job = pjoin(path, jobname)
135
    path_job_logs = pjoin(path_job, 'logs')
136
    path_job_commands = pjoin(path_job, 'commands')
137
138
    if not os.path.isdir(path_job_commands):
139
        os.makedirs(path_job_commands)
140
    if not os.path.isdir(path_job_logs):
141
        os.makedirs(path_job_logs)
142
    if not os.path.isdir(pjoin(path_job_logs, "worker")):
143
        os.makedirs(pjoin(path_job_logs, "worker"))
144
    if not os.path.isdir(pjoin(path_job_logs, "job")):
145
        os.makedirs(pjoin(path_job_logs, "job"))
146
147
    return path_job, path_job_logs, path_job_commands
148
149
150
def log_command_line(path_job, command_line):
151
    """ Logs a command line in a job folder.
152
153
    The command line is append to a file named 'command_line.log' that resides
154
    in the given job folder. The current date and time is also added along
155
    each command line logged.
156
157
    Notes
158
    -----
159
    Commands save in log file might differ from sys.argv since we want to make sure
160
    we can paste the command line as-is in the terminal. This means that the quotes
161
    symbole " and the square brackets will be escaped.
162
    """
163
    with open_with_lock(pjoin(path_job, "command_line.log"), 'a') as command_line_log:
164
        command_line_log.write(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
165
        command_line = command_line.replace('"', r'\"')  # Make sure we can paste the command line as-is
166
        command_line = re.sub(r'(\[)([^\[\]]*\\ [^\[\]]*)(\])', r'"\1\2\3"', command_line)  # Make sure we can paste the command line as-is
167
        command_line_log.write(command_line + "\n\n")
168
169
170
def launch_jobs(launcher, pbs_filenames, cluster_name, path_job):  # pragma: no cover
171
    ''' Invokes launcher on a set of PBS files.
172
173
    Parameters
174
    ----------
175
    launcher : str
176
        launcher name
177
    pbs_filenames : list of str
178
        a list of PBS files to launch
179
    cluster_name : str
180
        cluster name
181
    path_job : str
182
        path to the job folder
183
    '''
184
    jobs_id = []
185
    for pbs_filename in pbs_filenames:
186
        launcher_output = check_output('PBS_FILENAME={pbs_filename} {launcher} {pbs_filename}'.format(
187
            launcher=launcher, pbs_filename=pbs_filename), shell=True)
188
        jobs_id += [launcher_output.strip()]
189
190
        # On some clusters, SRMJID and PBS_JOBID don't match
191
        if cluster_name in ['helios']:
192
            launcher_output = check_output(['qstat', '-f']).split('Job Id: ')
193
            for job in launcher_output:
194
                if re.search(r"SRMJID:{job_id}".format(job_id=jobs_id[-1]), job):
195
                    pbs_job_id = re.match(r"[0-9a-zA-Z.-]*", job).group()
196
                    jobs_id[-1] = '{pbs}'.format(pbs=pbs_job_id)
197
198
    with open_with_lock(pjoin(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
199
        jobs_id_file.writelines(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
200
        jobs_id_file.writelines("\n".join(jobs_id) + "\n")
201
    print "\nJobs id:\n{jobs_id}".format(jobs_id=" ".join(jobs_id))
202