1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import os |
4
|
|
|
import re |
5
|
|
|
from smartdispatch.pbs import PBS |
6
|
|
|
from smartdispatch import utils |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params={}, cluster_name=None, base_path="./"): |
10
|
|
|
if cluster_name == "guillimin": |
11
|
|
|
return GuilliminJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
12
|
|
|
elif cluster_name == "mammouth": |
13
|
|
|
return MammouthJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
14
|
|
|
elif cluster_name == "helios": |
15
|
|
|
return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
16
|
|
|
elif cluster_name == "hades": |
17
|
|
|
return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
18
|
|
|
elif utils.get_launcher(cluster_name) == "sbatch": |
19
|
|
|
return SlurmJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
20
|
|
|
|
21
|
|
|
return JobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class JobGenerator(object): |
25
|
|
|
|
26
|
|
|
""" Offers functionalities to generate PBS files for a given queue. |
27
|
|
|
|
28
|
|
|
Parameters |
29
|
|
|
---------- |
30
|
|
|
queue : `Queue` instance |
31
|
|
|
queue on which commands will be executed |
32
|
|
|
commands : list of str |
33
|
|
|
commands to put in PBS files |
34
|
|
|
prolog : list of str |
35
|
|
|
code to execute before the commands |
36
|
|
|
epilog : list of str |
37
|
|
|
code to execute after the commands |
38
|
|
|
command_params : dict |
39
|
|
|
information about the commands |
40
|
|
|
""" |
41
|
|
|
|
42
|
|
|
def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, base_path="./"): |
43
|
|
|
self.prolog = prolog |
44
|
|
|
self.commands = commands |
45
|
|
|
self.epilog = epilog |
46
|
|
|
self.queue = queue |
47
|
|
|
self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path) |
48
|
|
|
|
49
|
|
|
self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1) |
50
|
|
|
self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1) |
51
|
|
|
#self.mem_per_command = command_params.get('mem_per_command', 0.0) |
52
|
|
|
|
53
|
|
|
self.pbs_list = self._generate_base_pbs() |
54
|
|
|
self._add_cluster_specific_rules() |
55
|
|
|
|
56
|
|
|
def _add_cluster_specific_rules(self): |
57
|
|
|
pass |
58
|
|
|
|
59
|
|
|
def add_pbs_flags(self, flags): |
60
|
|
|
resources = {} |
61
|
|
|
options = {} |
62
|
|
|
|
63
|
|
|
for flag in flags: |
64
|
|
|
flag = flag |
65
|
|
|
if flag.startswith('-l'): |
66
|
|
|
resource = flag[2:] |
67
|
|
|
split = resource.find('=') |
68
|
|
|
resources[resource[:split]] = resource[split+1:] |
69
|
|
|
elif flag.startswith('-'): |
70
|
|
|
options[flag[1:2]] = flag[2:] |
71
|
|
|
else: |
72
|
|
|
raise ValueError("Invalid PBS flag ({})".format(flag)) |
73
|
|
|
|
74
|
|
|
for pbs in self.pbs_list: |
75
|
|
|
pbs.add_resources(**resources) |
76
|
|
|
pbs.add_options(**options) |
77
|
|
|
|
78
|
|
|
def add_sbatch_flags(self, flags): |
79
|
|
|
options = {} |
80
|
|
|
|
81
|
|
|
for flag in flags: |
82
|
|
|
split = flag.find('=') |
83
|
|
|
if flag.startswith('--'): |
84
|
|
|
if split == -1: |
85
|
|
|
raise ValueError("Invalid SBATCH flag ({}), no '=' character found' ".format(flag)) |
86
|
|
|
options[flag[:split].lstrip("-")] = flag[split+1:] |
87
|
|
|
elif flag.startswith('-') and split == -1: |
88
|
|
|
options[flag[1:2]] = flag[2:] |
89
|
|
|
else: |
90
|
|
|
raise ValueError("Invalid SBATCH flag ({}, is it a PBS flag?)".format(flag)) |
91
|
|
|
|
92
|
|
|
for pbs in self.pbs_list: |
93
|
|
|
pbs.add_sbatch_options(**options) |
94
|
|
|
|
95
|
|
|
def _generate_base_pbs(self): |
96
|
|
|
""" Generates PBS files allowing the execution of every commands on the given queue. """ |
97
|
|
|
nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command |
98
|
|
|
|
99
|
|
|
if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0: |
100
|
|
|
nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command) |
101
|
|
|
|
102
|
|
|
pbs_files = [] |
103
|
|
|
# Distribute equally the jobs among the PBS files and generate those files |
104
|
|
|
for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)): |
105
|
|
|
pbs = PBS(self.queue.name, self.queue.walltime) |
106
|
|
|
|
107
|
|
|
# TODO Move the add_options into the JobManager once created. |
108
|
|
|
pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err')) |
109
|
|
|
|
110
|
|
|
# Set resource: nodes |
111
|
|
|
resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command) |
112
|
|
|
if self.queue.nb_gpus_per_node > 0: |
113
|
|
|
resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command) |
114
|
|
|
|
115
|
|
|
pbs.add_resources(nodes=resource) |
116
|
|
|
|
117
|
|
|
pbs.add_modules_to_load(*self.queue.modules) |
118
|
|
|
pbs.add_to_prolog(*self.prolog) |
119
|
|
|
pbs.add_commands(*commands) |
120
|
|
|
pbs.add_to_epilog(*self.epilog) |
121
|
|
|
|
122
|
|
|
pbs_files.append(pbs) |
123
|
|
|
|
124
|
|
|
return pbs_files |
125
|
|
|
|
126
|
|
|
def write_pbs_files(self, pbs_dir="./"): |
127
|
|
|
""" Writes PBS files allowing the execution of every commands on the given queue. |
128
|
|
|
|
129
|
|
|
Parameters |
130
|
|
|
---------- |
131
|
|
|
pbs_dir : str |
132
|
|
|
folder where to save pbs files |
133
|
|
|
""" |
134
|
|
|
pbs_filenames = [] |
135
|
|
|
for i, pbs in enumerate(self.pbs_list): |
136
|
|
|
pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh') |
137
|
|
|
pbs.save(pbs_filename) |
138
|
|
|
pbs_filenames.append(pbs_filename) |
139
|
|
|
|
140
|
|
|
return pbs_filenames |
141
|
|
|
|
142
|
|
|
def specify_account_name_from_env(self, environment_variable_name): |
143
|
|
|
if environment_variable_name not in os.environ: |
144
|
|
|
raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name)) |
145
|
|
|
|
146
|
|
|
account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name))) |
147
|
|
|
for pbs in self.pbs_list: |
148
|
|
|
pbs.add_options(A=account_name) |
149
|
|
|
|
150
|
|
|
def specify_account_name_from_file(self, rapid_filename): |
151
|
|
|
if not os.path.isfile(rapid_filename): |
152
|
|
|
raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename)) |
153
|
|
|
|
154
|
|
|
with open(rapid_filename, 'r') as rapid_file: |
155
|
|
|
account_name = rapid_file.read().strip() |
156
|
|
|
|
157
|
|
|
for pbs in self.pbs_list: |
158
|
|
|
pbs.add_options(A=account_name) |
159
|
|
|
|
160
|
|
|
|
161
|
|
|
class MammouthJobGenerator(JobGenerator): |
162
|
|
|
|
163
|
|
|
def _add_cluster_specific_rules(self): |
164
|
|
|
if self.queue.name.endswith("@mp2"): |
165
|
|
|
for pbs in self.pbs_list: |
166
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes']) |
167
|
|
|
|
168
|
|
|
|
169
|
|
|
class HadesJobGenerator(JobGenerator): |
170
|
|
|
|
171
|
|
|
def _add_cluster_specific_rules(self): |
172
|
|
|
for pbs in self.pbs_list: |
173
|
|
|
gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) |
174
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes']) |
175
|
|
|
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) |
176
|
|
|
|
177
|
|
|
|
178
|
|
|
class GuilliminJobGenerator(JobGenerator): |
179
|
|
|
|
180
|
|
|
def _add_cluster_specific_rules(self): |
181
|
|
|
return self.specify_account_name_from_env('HOME_GROUP') |
182
|
|
|
|
183
|
|
|
|
184
|
|
|
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6 |
185
|
|
|
class HeliosJobGenerator(JobGenerator): |
186
|
|
|
|
187
|
|
|
def _add_cluster_specific_rules(self): |
188
|
|
|
self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap")) |
189
|
|
|
|
190
|
|
|
for pbs in self.pbs_list: |
191
|
|
|
# Remove forbidden ppn option. Default is 2 cores per gpu. |
192
|
|
|
pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) |
193
|
|
|
|
194
|
|
|
|
195
|
|
|
class SlurmJobGenerator(JobGenerator): |
196
|
|
|
|
197
|
|
|
def __init__(self, *args, **kwargs): |
198
|
|
|
super(SlurmJobGenerator, self).__init__(*args, **kwargs) |
199
|
|
|
|
200
|
|
|
def _adapt_options(self, pbs): |
201
|
|
|
# Remove queue, there is no queue in Slurm |
202
|
|
|
if "-q" in pbs.options: |
203
|
|
|
del pbs.options["-q"] |
204
|
|
|
|
205
|
|
|
# SBATCH does not interpret options, they can only contain %A if we |
206
|
|
|
# want to include job's name and %a to include job array's index |
207
|
|
|
for option in ['-o', '-e']: |
208
|
|
|
pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A', |
209
|
|
|
pbs.options[option]) |
210
|
|
|
|
211
|
|
|
# Convert to Slurm's --export |
212
|
|
|
# |
213
|
|
|
# Warning: Slurm does **not** export variables defined locally such as |
214
|
|
|
# variables defined along the command line. For ex: |
215
|
|
|
# PBS_FILENAME=something sbatch --export=ALL somefile.sh |
216
|
|
|
# would *not* export PBS_FILENAME to the script. |
217
|
|
|
if pbs.options.pop('-V', None) is not None: |
218
|
|
|
pbs.add_sbatch_options(export='ALL') |
219
|
|
|
|
220
|
|
|
def _adapt_commands(self, pbs): |
221
|
|
|
pass |
222
|
|
|
|
223
|
|
|
def _adapt_resources(self, pbs): |
224
|
|
|
# Set proper option for gpus |
225
|
|
|
match = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']) |
226
|
|
|
if match: |
227
|
|
|
gpus = match.group(1) |
228
|
|
|
pbs.add_resources(naccelerators=gpus) |
229
|
|
|
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", |
230
|
|
|
pbs.resources['nodes']) |
231
|
|
|
|
232
|
|
|
# Set proper option for cpus |
233
|
|
|
match = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']) |
234
|
|
|
if match: |
235
|
|
|
ppn = match.group(1) |
236
|
|
|
pbs.add_resources(ncpus=ppn) |
237
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) |
238
|
|
|
|
239
|
|
|
def _adapt_variable_names(self, pbs): |
240
|
|
|
for command_id, command in enumerate(pbs.commands): |
241
|
|
|
pbs.commands[command_id] = command = re.sub( |
242
|
|
|
"\$PBS_JOBID", "$SLURM_JOB_ID", command) |
243
|
|
|
# NOTE: SBATCH_TIMELIMIT is **not** an official slurm environment |
244
|
|
|
# variable, it needs to be set in the script. |
245
|
|
|
pbs.commands[command_id] = command = re.sub( |
246
|
|
|
"\$PBS_WALLTIME", "$SBATCH_TIMELIMIT", command) |
247
|
|
|
|
248
|
|
|
def _adapt_prolog(self, pbs): |
249
|
|
|
# Set SBATCH_TIMELIMIT in the prolog, hence, before any code from |
250
|
|
|
# commands and epilog. |
251
|
|
|
pbs.add_to_prolog( |
252
|
|
|
"SBATCH_TIMELIMIT=%s" % |
253
|
|
|
utils.walltime_to_seconds(pbs.resources["walltime"])) |
254
|
|
|
|
255
|
|
|
def _add_cluster_specific_rules(self): |
256
|
|
|
for pbs in self.pbs_list: |
257
|
|
|
self._adapt_options(pbs) |
258
|
|
|
self._adapt_resources(pbs) |
259
|
|
|
self._adapt_variable_names(pbs) |
260
|
|
|
self._adapt_prolog(pbs) |
261
|
|
|
self._adapt_commands(pbs) |
262
|
|
|
|