1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import os |
4
|
|
|
import re |
5
|
|
|
from smartdispatch.pbs import PBS |
6
|
|
|
from smartdispatch import utils |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def job_generator_factory(queue, commands, command_params={}, cluster_name=None, base_path="./"): |
10
|
|
|
if cluster_name == "guillimin": |
11
|
|
|
return GuilliminJobGenerator(queue, commands, command_params, base_path) |
12
|
|
|
elif cluster_name == "mammouth": |
13
|
|
|
return MammouthJobGenerator(queue, commands, command_params, base_path) |
14
|
|
|
elif cluster_name == "helios": |
15
|
|
|
return HeliosJobGenerator(queue, commands, command_params, base_path) |
16
|
|
|
elif cluster_name == "hades": |
17
|
|
|
return HadesJobGenerator(queue, commands, command_params, base_path) |
18
|
|
|
|
19
|
|
|
return JobGenerator(queue, commands, command_params, base_path) |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class JobGenerator(object): |
23
|
|
|
|
24
|
|
|
""" Offers functionalities to generate PBS files for a given queue. |
25
|
|
|
|
26
|
|
|
Parameters |
27
|
|
|
---------- |
28
|
|
|
queue : `Queue` instance |
29
|
|
|
queue on which commands will be executed |
30
|
|
|
commands : list of str |
31
|
|
|
commands to put in PBS files |
32
|
|
|
command_params : dict |
33
|
|
|
information about the commands |
34
|
|
|
""" |
35
|
|
|
|
36
|
|
|
def __init__(self, queue, commands, command_params={}, base_path="./"): |
37
|
|
|
self.commands = commands |
38
|
|
|
self.queue = queue |
39
|
|
|
self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path) |
40
|
|
|
|
41
|
|
|
self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1) |
42
|
|
|
self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1) |
43
|
|
|
#self.mem_per_command = command_params.get('mem_per_command', 0.0) |
44
|
|
|
|
45
|
|
|
self.pbs_list = self._generate_base_pbs() |
46
|
|
|
self._add_cluster_specific_rules() |
47
|
|
|
|
48
|
|
|
def _add_cluster_specific_rules(self): |
49
|
|
|
pass |
50
|
|
|
|
51
|
|
|
def add_pbs_flags(self, flags): |
52
|
|
|
for flag in flags: |
53
|
|
|
if flag.startswith('-l'): |
54
|
|
|
flag[2:].trim() |
55
|
|
|
|
56
|
|
|
def _generate_base_pbs(self): |
57
|
|
|
""" Generates PBS files allowing the execution of every commands on the given queue. """ |
58
|
|
|
nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command |
59
|
|
|
|
60
|
|
|
if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0: |
61
|
|
|
nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command) |
62
|
|
|
|
63
|
|
|
pbs_files = [] |
64
|
|
|
# Distribute equally the jobs among the PBS files and generate those files |
65
|
|
|
for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)): |
66
|
|
|
pbs = PBS(self.queue.name, self.queue.walltime) |
67
|
|
|
|
68
|
|
|
# TODO Move the add_options into the JobManager once created. |
69
|
|
|
pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err')) |
70
|
|
|
|
71
|
|
|
# Set resource: nodes |
72
|
|
|
resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command) |
73
|
|
|
if self.queue.nb_gpus_per_node > 0: |
74
|
|
|
resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command) |
75
|
|
|
|
76
|
|
|
pbs.add_resources(nodes=resource) |
77
|
|
|
|
78
|
|
|
pbs.add_modules_to_load(*self.queue.modules) |
79
|
|
|
pbs.add_commands(*commands) |
80
|
|
|
|
81
|
|
|
pbs_files.append(pbs) |
82
|
|
|
|
83
|
|
|
return pbs_files |
84
|
|
|
|
85
|
|
|
def write_pbs_files(self, pbs_dir="./"): |
86
|
|
|
""" Writes PBS files allowing the execution of every commands on the given queue. |
87
|
|
|
|
88
|
|
|
Parameters |
89
|
|
|
---------- |
90
|
|
|
pbs_dir : str |
91
|
|
|
folder where to save pbs files |
92
|
|
|
""" |
93
|
|
|
pbs_filenames = [] |
94
|
|
|
for i, pbs in enumerate(self.pbs_list): |
95
|
|
|
pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh') |
96
|
|
|
pbs.save(pbs_filename) |
97
|
|
|
pbs_filenames.append(pbs_filename) |
98
|
|
|
|
99
|
|
|
return pbs_filenames |
100
|
|
|
|
101
|
|
|
def specify_account_name_from_env(self, environment_variable_name): |
102
|
|
|
if environment_variable_name not in os.environ: |
103
|
|
|
raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name)) |
104
|
|
|
|
105
|
|
|
account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name))) |
106
|
|
|
for pbs in self.pbs_list: |
107
|
|
|
pbs.add_options(A=account_name) |
108
|
|
|
|
109
|
|
|
def specify_account_name_from_file(self, rapid_filename): |
110
|
|
|
if not os.path.isfile(rapid_filename): |
111
|
|
|
raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename)) |
112
|
|
|
|
113
|
|
|
with open(rapid_filename, 'r') as rapid_file: |
114
|
|
|
account_name = rapid_file.read().strip() |
115
|
|
|
|
116
|
|
|
for pbs in self.pbs_list: |
117
|
|
|
pbs.add_options(A=account_name) |
118
|
|
|
|
119
|
|
|
|
120
|
|
|
class MammouthJobGenerator(JobGenerator): |
121
|
|
|
|
122
|
|
|
def _add_cluster_specific_rules(self): |
123
|
|
|
if self.queue.name.endswith("@mp2"): |
124
|
|
|
for pbs in self.pbs_list: |
125
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes']) |
126
|
|
|
|
127
|
|
|
|
128
|
|
|
class HadesJobGenerator(JobGenerator): |
129
|
|
|
|
130
|
|
|
def _add_cluster_specific_rules(self): |
131
|
|
|
for pbs in self.pbs_list: |
132
|
|
|
gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) |
133
|
|
|
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes']) |
134
|
|
|
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) |
135
|
|
|
|
136
|
|
|
|
137
|
|
|
class GuilliminJobGenerator(JobGenerator): |
138
|
|
|
|
139
|
|
|
def _add_cluster_specific_rules(self): |
140
|
|
|
return self.specify_account_name_from_env('HOME_GROUP') |
141
|
|
|
|
142
|
|
|
|
143
|
|
|
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6 |
144
|
|
|
class HeliosJobGenerator(JobGenerator): |
145
|
|
|
|
146
|
|
|
def _add_cluster_specific_rules(self): |
147
|
|
|
self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap")) |
148
|
|
|
|
149
|
|
|
for pbs in self.pbs_list: |
150
|
|
|
# Remove forbidden ppn option. Default is 2 cores per gpu. |
151
|
|
|
pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) |
152
|
|
|
|