1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import os |
4
|
|
|
import re |
5
|
|
|
from smartdispatch.pbs import PBS |
6
|
|
|
from smartdispatch import utils |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def job_generator_factory(queue, commands, command_params={}, cluster_name=None, base_path="./"): |
10
|
|
|
if cluster_name == "guillimin": |
11
|
|
|
return GuilliminJobGenerator(queue, commands, command_params, base_path) |
12
|
|
|
elif cluster_name == "mammouth": |
13
|
|
|
return MammouthJobGenerator(queue, commands, command_params, base_path) |
14
|
|
|
elif cluster_name == "helios": |
15
|
|
|
return HeliosJobGenerator(queue, commands, command_params, base_path) |
16
|
|
|
elif cluster_name == "hades": |
17
|
|
|
return HadesJobGenerator(queue, commands, command_params, base_path) |
18
|
|
|
|
19
|
|
|
return JobGenerator(queue, commands, command_params, base_path) |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class JobGenerator(object): |
23
|
|
|
|
24
|
|
|
""" Offers functionalities to generate PBS files for a given queue. |
25
|
|
|
|
26
|
|
|
Parameters |
27
|
|
|
---------- |
28
|
|
|
queue : `Queue` instance |
29
|
|
|
queue on which commands will be executed |
30
|
|
|
commands : list of str |
31
|
|
|
commands to put in PBS files |
32
|
|
|
command_params : dict |
33
|
|
|
information about the commands |
34
|
|
|
""" |
35
|
|
|
|
36
|
|
|
def __init__(self, queue, commands, command_params={}, base_path="./"): |
37
|
|
|
self.commands = commands |
38
|
|
|
self.queue = queue |
39
|
|
|
self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path) |
40
|
|
|
|
41
|
|
|
self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1) |
42
|
|
|
self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1) |
43
|
|
|
#self.mem_per_command = command_params.get('mem_per_command', 0.0) |
44
|
|
|
|
45
|
|
|
def generate_pbs(self): |
46
|
|
|
""" Generates PBS files allowing the execution of every commands on the given queue. """ |
47
|
|
|
nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command |
48
|
|
|
|
49
|
|
|
if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0: |
50
|
|
|
nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command) |
51
|
|
|
|
52
|
|
|
pbs_files = [] |
53
|
|
|
# Distribute equally the jobs among the PBS files and generate those files |
54
|
|
|
for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)): |
55
|
|
|
pbs = PBS(self.queue.name, self.queue.walltime) |
56
|
|
|
|
57
|
|
|
# TODO Move the add_options into the JobManager once created. |
58
|
|
|
pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err')) |
59
|
|
|
|
60
|
|
|
# Set resource: nodes |
61
|
|
|
resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command) |
62
|
|
|
if self.queue.nb_gpus_per_node > 0: |
63
|
|
|
resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command) |
64
|
|
|
|
65
|
|
|
pbs.add_resources(nodes=resource) |
66
|
|
|
|
67
|
|
|
pbs.add_modules_to_load(*self.queue.modules) |
68
|
|
|
pbs.add_commands(*commands) |
69
|
|
|
|
70
|
|
|
pbs_files.append(pbs) |
71
|
|
|
|
72
|
|
|
return pbs_files |
73
|
|
|
|
74
|
|
|
def write_pbs_files(self, pbs_dir="./"): |
75
|
|
|
""" Writes PBS files allowing the execution of every commands on the given queue. |
76
|
|
|
|
77
|
|
|
Parameters |
78
|
|
|
---------- |
79
|
|
|
pbs_dir : str |
80
|
|
|
folder where to save pbs files |
81
|
|
|
""" |
82
|
|
|
pbs_list = self.generate_pbs() |
83
|
|
|
pbs_filenames = [] |
84
|
|
|
for i, pbs in enumerate(pbs_list): |
85
|
|
|
pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh') |
86
|
|
|
pbs.save(pbs_filename) |
87
|
|
|
pbs_filenames.append(pbs_filename) |
88
|
|
|
|
89
|
|
|
return pbs_filenames |
90
|
|
|
|
91
|
|
|
def specify_account_name_from_env(self, pbs_list, environment_variable_name): |
92
|
|
|
if environment_variable_name not in os.environ: |
93
|
|
|
raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name)) |
94
|
|
|
|
95
|
|
|
account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name))) |
96
|
|
|
for pbs in pbs_list: |
97
|
|
|
pbs.add_options(A=account_name) |
98
|
|
|
|
99
|
|
|
return pbs_list |
100
|
|
|
|
101
|
|
|
def specify_account_name_from_file(self, pbs_list, rapid_filename): |
102
|
|
|
if not os.path.isfile(rapid_filename): |
103
|
|
|
raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename)) |
104
|
|
|
|
105
|
|
|
with open(rapid_filename, 'r') as rapid_file: |
106
|
|
|
account_name = rapid_file.read().strip() |
107
|
|
|
|
108
|
|
|
for pbs in pbs_list: |
109
|
|
|
pbs.add_options(A=account_name) |
110
|
|
|
|
111
|
|
|
return pbs_list |
112
|
|
|
|
113
|
|
|
|
114
|
|
|
class MammouthJobGenerator(JobGenerator): |
115
|
|
|
|
116
|
|
|
def generate_pbs(self): |
117
|
|
|
pbs_list = super(MammouthJobGenerator, self).generate_pbs() |
118
|
|
|
|
119
|
|
|
if self.queue.name.endswith("@mp2"): |
120
|
|
|
for pbs in pbs_list: |
121
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes']) |
122
|
|
|
|
123
|
|
|
return pbs_list |
124
|
|
|
|
125
|
|
|
|
126
|
|
|
class HadesJobGenerator(JobGenerator): |
127
|
|
|
|
128
|
|
|
def generate_pbs(self): |
129
|
|
|
pbs_list = super(HadesJobGenerator, self).generate_pbs() |
130
|
|
|
|
131
|
|
|
for pbs in pbs_list: |
132
|
|
|
gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) |
133
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes']) |
134
|
|
|
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) |
135
|
|
|
|
136
|
|
|
return pbs_list |
137
|
|
|
|
138
|
|
|
|
139
|
|
|
class GuilliminJobGenerator(JobGenerator): |
140
|
|
|
|
141
|
|
|
def generate_pbs(self): |
142
|
|
|
pbs_list = super(GuilliminJobGenerator, self).generate_pbs() |
143
|
|
|
return self.specify_account_name_from_env(pbs_list, 'HOME_GROUP') |
144
|
|
|
|
145
|
|
|
|
146
|
|
|
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6 |
147
|
|
|
class HeliosJobGenerator(JobGenerator): |
148
|
|
|
|
149
|
|
|
def generate_pbs(self): |
150
|
|
|
pbs_list = super(HeliosJobGenerator, self).generate_pbs() |
151
|
|
|
pbs_list = self.specify_account_name_from_file(pbs_list, os.path.join(os.environ['HOME'], ".default_rap")) |
152
|
|
|
|
153
|
|
|
for pbs in pbs_list: |
154
|
|
|
# Remove forbidden ppn option. Default is 2 cores per gpu. |
155
|
|
|
pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) |
156
|
|
|
|
157
|
|
|
return pbs_list |
158
|
|
|
|