Completed
Pull Request — master (#173)
by
unknown
27s
created

JobGenerator.add_sbatch_flags()   B

Complexity

Conditions 7

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 0
loc 16
rs 7.3333
1
from __future__ import absolute_import
2
3
import os
4
import re
5
from smartdispatch.pbs import PBS
6
from smartdispatch import utils
7
8
9
def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params={}, cluster_name=None, base_path="./"):
10
    if cluster_name == "guillimin":
11
        return GuilliminJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
12
    elif cluster_name == "mammouth":
13
        return MammouthJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
14
    elif cluster_name == "helios":
15
        return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
16
    elif cluster_name == "hades":
17
        return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
18
    elif utils.get_launcher(cluster_name) == "sbatch":
19
        return SlurmJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
20
21
    return JobGenerator(queue, commands, prolog, epilog, command_params, base_path)
22
23
24
class JobGenerator(object):
25
26
    """ Offers functionalities to generate PBS files for a given queue.
27
28
    Parameters
29
    ----------
30
    queue : `Queue` instance
31
        queue on which commands will be executed
32
    commands : list of str
33
        commands to put in PBS files
34
    prolog : list of str
35
        code to execute before the commands
36
    epilog : list of str
37
        code to execute after the commands
38
    command_params : dict
39
        information about the commands
40
    """
41
42
    def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, base_path="./"):
43
        self.prolog = prolog
44
        self.commands = commands
45
        self.epilog = epilog
46
        self.queue = queue
47
        self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path)
48
49
        self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1)
50
        self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1)
51
        #self.mem_per_command = command_params.get('mem_per_command', 0.0)
52
53
        self.pbs_list = self._generate_base_pbs()
54
        self._add_cluster_specific_rules()
55
56
    def _add_cluster_specific_rules(self):
57
        pass
58
59
    def add_pbs_flags(self, flags):
60
        resources = {}
61
        options = {}
62
63
        for flag in flags:
64
            flag = flag
65
            if flag.startswith('-l'):
66
                resource = flag[2:]
67
                split = resource.find('=')
68
                resources[resource[:split]] = resource[split+1:]
69
            elif flag.startswith('-'):
70
                options[flag[1:2]] = flag[2:]
71
            else:
72
                raise ValueError("Invalid PBS flag ({})".format(flag))
73
74
        for pbs in self.pbs_list:
75
            pbs.add_resources(**resources)
76
            pbs.add_options(**options)
77
78
    def add_sbatch_flags(self, flags):
79
        options = {}
80
81
        for flag in flags:
82
            split = flag.find('=')
83
            if flag.startswith('--'):
84
                if split == -1:
85
                    raise ValueError("Invalid SBATCH flag ({}), no '=' character found' ".format(flag))
86
                options[flag[:split].lstrip("-")] = flag[split+1:]
87
            elif flag.startswith('-') and split == -1:
88
                options[flag[1:2]] = flag[2:]
89
            else:
90
                raise ValueError("Invalid SBATCH flag ({}, is it a PBS flag?)".format(flag))
91
92
        for pbs in self.pbs_list:
93
            pbs.add_sbatch_options(**options)
94
95
    def _generate_base_pbs(self):
96
        """ Generates PBS files allowing the execution of every commands on the given queue. """
97
        nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command
98
99
        if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0:
100
            nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command)
101
102
        pbs_files = []
103
        # Distribute equally the jobs among the PBS files and generate those files
104
        for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)):
105
            pbs = PBS(self.queue.name, self.queue.walltime)
106
107
            # TODO Move the add_options into the JobManager once created.
108
            pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err'))
109
110
            # Set resource: nodes
111
            resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command)
112
            if self.queue.nb_gpus_per_node > 0:
113
                resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command)
114
115
            pbs.add_resources(nodes=resource)
116
117
            pbs.add_modules_to_load(*self.queue.modules)
118
            pbs.add_to_prolog(*self.prolog)
119
            pbs.add_commands(*commands)
120
            pbs.add_to_epilog(*self.epilog)
121
122
            pbs_files.append(pbs)
123
124
        return pbs_files
125
126
    def write_pbs_files(self, pbs_dir="./"):
127
        """ Writes PBS files allowing the execution of every commands on the given queue.
128
129
        Parameters
130
        ----------
131
        pbs_dir : str
132
            folder where to save pbs files
133
        """
134
        pbs_filenames = []
135
        for i, pbs in enumerate(self.pbs_list):
136
            pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh')
137
            pbs.save(pbs_filename)
138
            pbs_filenames.append(pbs_filename)
139
140
        return pbs_filenames
141
142
    def specify_account_name_from_env(self, environment_variable_name):
143
        if environment_variable_name not in os.environ:
144
            raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name))
145
146
        account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name)))
147
        for pbs in self.pbs_list:
148
            pbs.add_options(A=account_name)
149
150
    def specify_account_name_from_file(self, rapid_filename):
151
        if not os.path.isfile(rapid_filename):
152
            raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename))
153
154
        with open(rapid_filename, 'r') as rapid_file:
155
            account_name = rapid_file.read().strip()
156
157
        for pbs in self.pbs_list:
158
            pbs.add_options(A=account_name)
159
160
161
class MammouthJobGenerator(JobGenerator):
162
163
    def _add_cluster_specific_rules(self):
164
        if self.queue.name.endswith("@mp2"):
165
            for pbs in self.pbs_list:
166
                pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes'])
167
168
169
class HadesJobGenerator(JobGenerator):
170
171
    def _add_cluster_specific_rules(self):
172
        for pbs in self.pbs_list:
173
            gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1)
174
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes'])
175
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes'])
176
177
178
class GuilliminJobGenerator(JobGenerator):
179
180
    def _add_cluster_specific_rules(self):
181
        return self.specify_account_name_from_env('HOME_GROUP')
182
183
184
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6
185
class HeliosJobGenerator(JobGenerator):
186
187
    def _add_cluster_specific_rules(self):
188
        self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap"))
189
190
        for pbs in self.pbs_list:
191
            # Remove forbidden ppn option. Default is 2 cores per gpu.
192
            pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes'])
193
194
195
class SlurmJobGenerator(JobGenerator):
196
197
    def __init__(self, *args, **kwargs):
198
        super(SlurmJobGenerator, self).__init__(*args, **kwargs)
199
200
    def _adapt_options(self, pbs):
201
        # Remove queue, there is no queue in Slurm
202
        if "-q" in pbs.options:
203
            del pbs.options["-q"]
204
205
        # SBATCH does not interpret options, they can only contain %A if we
206
        # want to include job's name and %a to include job array's index
207
        for option in ['-o', '-e']:
208
            pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A',
209
                                         pbs.options[option])
210
211
        # Convert to Slurm's --export
212
        #
213
        # Warning: Slurm does **not** export variables defined locally such as
214
        #          variables defined along the command line. For ex:
215
        #          PBS_FILENAME=something sbatch --export=ALL somefile.sh
216
        #          would *not* export PBS_FILENAME to the script.
217
        if pbs.options.pop('-V', None) is not None:
218
            pbs.add_sbatch_options(export='ALL')
219
220
    def _adapt_commands(self, pbs):
221
        pass
222
223
    def _adapt_resources(self, pbs):
224
        # Set proper option for gpus
225
        match = re.match(".*gpus=([0-9]+)", pbs.resources['nodes'])
226
        if match:
227
            gpus = match.group(1)
228
            pbs.add_resources(naccelerators=gpus)
229
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "",
230
                                            pbs.resources['nodes'])
231
232
        # Set proper option for cpus
233
        match = re.match(".*ppn=([0-9]+)", pbs.resources['nodes'])
234
        if match:
235
            ppn = match.group(1)
236
            pbs.add_resources(ncpus=ppn)
237
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes'])
238
239
    def _adapt_variable_names(self, pbs):
240
        for command_id, command in enumerate(pbs.commands):
241
            pbs.commands[command_id] = command = re.sub(
242
                "\$PBS_JOBID", "$SLURM_JOB_ID", command)
243
            # NOTE: SBATCH_TIMELIMIT is **not** an official slurm environment
244
            # variable, it needs to be set in the script.
245
            pbs.commands[command_id] = command = re.sub(
246
                "\$PBS_WALLTIME", "$SBATCH_TIMELIMIT", command)
247
248
    def _adapt_prolog(self, pbs):
249
        # Set SBATCH_TIMELIMIT in the prolog, hence, before any code from
250
        # commands and epilog.
251
        pbs.add_to_prolog(
252
            "SBATCH_TIMELIMIT=%s" %
253
            utils.walltime_to_seconds(pbs.resources["walltime"]))
254
255
    def _add_cluster_specific_rules(self):
256
        for pbs in self.pbs_list:
257
            self._adapt_options(pbs)
258
            self._adapt_resources(pbs)
259
            self._adapt_variable_names(pbs)
260
            self._adapt_prolog(pbs)
261
            self._adapt_commands(pbs)
262