1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import os |
4
|
|
|
import re |
5
|
|
|
from smartdispatch.pbs import PBS |
6
|
|
|
from smartdispatch import utils |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params={}, cluster_name=None, base_path="./"): |
10
|
|
|
if cluster_name == "guillimin": |
11
|
|
|
return GuilliminJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
12
|
|
|
elif cluster_name == "mammouth": |
13
|
|
|
return MammouthJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
14
|
|
|
elif cluster_name == "helios": |
15
|
|
|
return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
16
|
|
|
elif cluster_name == "hades": |
17
|
|
|
return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
18
|
|
|
|
19
|
|
|
return JobGenerator(queue, commands, prolog, epilog, command_params, base_path) |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class JobGenerator(object): |
23
|
|
|
|
24
|
|
|
""" Offers functionalities to generate PBS files for a given queue. |
25
|
|
|
|
26
|
|
|
Parameters |
27
|
|
|
---------- |
28
|
|
|
queue : `Queue` instance |
29
|
|
|
queue on which commands will be executed |
30
|
|
|
commands : list of str |
31
|
|
|
commands to put in PBS files |
32
|
|
|
prolog : list of str |
33
|
|
|
code to execute before the commands |
34
|
|
|
epilog : list of str |
35
|
|
|
code to execute after the commands |
36
|
|
|
command_params : dict |
37
|
|
|
information about the commands |
38
|
|
|
""" |
39
|
|
|
|
40
|
|
|
def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, base_path="./"): |
41
|
|
|
self.prolog = prolog |
42
|
|
|
self.commands = commands |
43
|
|
|
self.epilog = epilog |
44
|
|
|
self.queue = queue |
45
|
|
|
self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path) |
46
|
|
|
|
47
|
|
|
self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1) |
48
|
|
|
self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1) |
49
|
|
|
#self.mem_per_command = command_params.get('mem_per_command', 0.0) |
50
|
|
|
|
51
|
|
|
self.pbs_list = self._generate_base_pbs() |
52
|
|
|
self._add_cluster_specific_rules() |
53
|
|
|
|
54
|
|
|
def _add_cluster_specific_rules(self): |
55
|
|
|
pass |
56
|
|
|
|
57
|
|
|
def add_pbs_flags(self, flags): |
58
|
|
|
resources = {} |
59
|
|
|
options = {} |
60
|
|
|
|
61
|
|
|
for flag in flags: |
62
|
|
|
flag = flag |
63
|
|
|
if flag.startswith('-l'): |
64
|
|
|
resource = flag[2:] |
65
|
|
|
split = resource.find('=') |
66
|
|
|
resources[resource[:split]] = resource[split+1:] |
67
|
|
|
elif flag.startswith('-'): |
68
|
|
|
options[flag[1:2]] = flag[2:] |
69
|
|
|
else: |
70
|
|
|
raise ValueError("Invalid PBS flag ({})".format(flag)) |
71
|
|
|
|
72
|
|
|
for pbs in self.pbs_list: |
73
|
|
|
pbs.add_resources(**resources) |
74
|
|
|
pbs.add_options(**options) |
75
|
|
|
|
76
|
|
|
def _generate_base_pbs(self): |
77
|
|
|
""" Generates PBS files allowing the execution of every commands on the given queue. """ |
78
|
|
|
nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command |
79
|
|
|
|
80
|
|
|
if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0: |
81
|
|
|
nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command) |
82
|
|
|
|
83
|
|
|
pbs_files = [] |
84
|
|
|
# Distribute equally the jobs among the PBS files and generate those files |
85
|
|
|
for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)): |
86
|
|
|
pbs = PBS(self.queue.name, self.queue.walltime) |
87
|
|
|
|
88
|
|
|
# TODO Move the add_options into the JobManager once created. |
89
|
|
|
pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err')) |
90
|
|
|
|
91
|
|
|
# Set resource: nodes |
92
|
|
|
resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command) |
93
|
|
|
if self.queue.nb_gpus_per_node > 0: |
94
|
|
|
resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command) |
95
|
|
|
|
96
|
|
|
pbs.add_resources(nodes=resource) |
97
|
|
|
|
98
|
|
|
pbs.add_modules_to_load(*self.queue.modules) |
99
|
|
|
pbs.add_to_prolog(*self.prolog) |
100
|
|
|
pbs.add_commands(*commands) |
101
|
|
|
pbs.add_to_epilog(*self.epilog) |
102
|
|
|
|
103
|
|
|
pbs_files.append(pbs) |
104
|
|
|
|
105
|
|
|
return pbs_files |
106
|
|
|
|
107
|
|
|
def write_pbs_files(self, pbs_dir="./"): |
108
|
|
|
""" Writes PBS files allowing the execution of every commands on the given queue. |
109
|
|
|
|
110
|
|
|
Parameters |
111
|
|
|
---------- |
112
|
|
|
pbs_dir : str |
113
|
|
|
folder where to save pbs files |
114
|
|
|
""" |
115
|
|
|
pbs_filenames = [] |
116
|
|
|
for i, pbs in enumerate(self.pbs_list): |
117
|
|
|
pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh') |
118
|
|
|
pbs.save(pbs_filename) |
119
|
|
|
pbs_filenames.append(pbs_filename) |
120
|
|
|
|
121
|
|
|
return pbs_filenames |
122
|
|
|
|
123
|
|
|
def specify_account_name_from_env(self, environment_variable_name): |
124
|
|
|
if environment_variable_name not in os.environ: |
125
|
|
|
raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name)) |
126
|
|
|
|
127
|
|
|
account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name))) |
128
|
|
|
for pbs in self.pbs_list: |
129
|
|
|
pbs.add_options(A=account_name) |
130
|
|
|
|
131
|
|
|
def specify_account_name_from_file(self, rapid_filename): |
132
|
|
|
if not os.path.isfile(rapid_filename): |
133
|
|
|
raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename)) |
134
|
|
|
|
135
|
|
|
with open(rapid_filename, 'r') as rapid_file: |
136
|
|
|
account_name = rapid_file.read().strip() |
137
|
|
|
|
138
|
|
|
for pbs in self.pbs_list: |
139
|
|
|
pbs.add_options(A=account_name) |
140
|
|
|
|
141
|
|
|
|
142
|
|
|
class MammouthJobGenerator(JobGenerator): |
143
|
|
|
|
144
|
|
|
def _add_cluster_specific_rules(self): |
145
|
|
|
if self.queue.name.endswith("@mp2"): |
146
|
|
|
for pbs in self.pbs_list: |
147
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes']) |
148
|
|
|
|
149
|
|
|
|
150
|
|
|
class HadesJobGenerator(JobGenerator): |
151
|
|
|
|
152
|
|
|
def _add_cluster_specific_rules(self): |
153
|
|
|
for pbs in self.pbs_list: |
154
|
|
|
gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) |
155
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes']) |
156
|
|
|
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) |
157
|
|
|
|
158
|
|
|
|
159
|
|
|
class GuilliminJobGenerator(JobGenerator): |
160
|
|
|
|
161
|
|
|
def _add_cluster_specific_rules(self): |
162
|
|
|
return self.specify_account_name_from_env('HOME_GROUP') |
163
|
|
|
|
164
|
|
|
|
165
|
|
|
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6 |
166
|
|
|
class HeliosJobGenerator(JobGenerator): |
167
|
|
|
|
168
|
|
|
def _add_cluster_specific_rules(self): |
169
|
|
|
self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap")) |
170
|
|
|
|
171
|
|
|
for pbs in self.pbs_list: |
172
|
|
|
# Remove forbidden ppn option. Default is 2 cores per gpu. |
173
|
|
|
pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) |
174
|
|
|
|