Completed
Pull Request — master (#166)
by
unknown
29s
created

JobGenerator   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 1
Metric Value
c 5
b 0
f 1
dl 0
loc 133
rs 10
wmc 26

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 13 1
A _add_cluster_specific_rules() 0 2 1
B add_pbs_flags() 0 18 5
A specify_account_name_from_env() 0 7 3
B add_sbatch_flags() 0 14 5
A specify_account_name_from_file() 0 9 4
A write_pbs_files() 0 15 2
B _generate_base_pbs() 0 30 5
1
from __future__ import absolute_import
2
3
import os
4
import re
5
from smartdispatch.pbs import PBS
6
from smartdispatch import utils
7
8
9
def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params={}, cluster_name=None, base_path="./"):
10
    if cluster_name == "guillimin":
11
        return GuilliminJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
12
    elif cluster_name == "mammouth":
13
        return MammouthJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
14
    elif cluster_name == "helios":
15
        return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
16
    elif cluster_name == "hades":
17
        return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
18
19
    return JobGenerator(queue, commands, prolog, epilog, command_params, base_path)
20
21
22
class JobGenerator(object):
23
24
    """ Offers functionalities to generate PBS files for a given queue.
25
26
    Parameters
27
    ----------
28
    queue : `Queue` instance
29
        queue on which commands will be executed
30
    commands : list of str
31
        commands to put in PBS files
32
    prolog : list of str
33
        code to execute before the commands
34
    epilog : list of str
35
        code to execute after the commands
36
    command_params : dict
37
        information about the commands
38
    """
39
40
    def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, base_path="./"):
41
        self.prolog = prolog
42
        self.commands = commands
43
        self.epilog = epilog
44
        self.queue = queue
45
        self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path)
46
47
        self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1)
48
        self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1)
49
        #self.mem_per_command = command_params.get('mem_per_command', 0.0)
50
51
        self.pbs_list = self._generate_base_pbs()
52
        self._add_cluster_specific_rules()
53
54
    def _add_cluster_specific_rules(self):
55
        pass
56
57
    def add_pbs_flags(self, flags):
58
        resources = {}
59
        options = {}
60
61
        for flag in flags:
62
            flag = flag
63
            if flag.startswith('-l'):
64
                resource = flag[2:]
65
                split = resource.find('=')
66
                resources[resource[:split]] = resource[split+1:]
67
            elif flag.startswith('-'):
68
                options[flag[1:2]] = flag[2:]
69
            else:
70
                raise ValueError("Invalid PBS flag ({})".format(flag))
71
72
        for pbs in self.pbs_list:
73
            pbs.add_resources(**resources)
74
            pbs.add_options(**options)
75
76
    def add_sbatch_flags(self, flags):
77
        options = {}
78
79
        for flag in flags:
80
            split = flag.find('=')
81
            if flag.startswith('--'):
82
                options[flag[2:split]] = flag[split+1:]
83
            elif flag.startswith('-'):
84
                options[flag[1:split]] = flag[split+1:]
85
            else:
86
                raise ValueError("Invalid SBATCH flag ({})".format(flag))
87
88
        for pbs in self.pbs_list:
89
            pbs.add_sbatch_options(**options)
90
91
    def _generate_base_pbs(self):
92
        """ Generates PBS files allowing the execution of every commands on the given queue. """
93
        nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command
94
95
        if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0:
96
            nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command)
97
98
        pbs_files = []
99
        # Distribute equally the jobs among the PBS files and generate those files
100
        for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)):
101
            pbs = PBS(self.queue.name, self.queue.walltime)
102
103
            # TODO Move the add_options into the JobManager once created.
104
            pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err'))
105
106
            # Set resource: nodes
107
            resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command)
108
            if self.queue.nb_gpus_per_node > 0:
109
                resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command)
110
111
            pbs.add_resources(nodes=resource)
112
113
            pbs.add_modules_to_load(*self.queue.modules)
114
            pbs.add_to_prolog(*self.prolog)
115
            pbs.add_commands(*commands)
116
            pbs.add_to_epilog(*self.epilog)
117
118
            pbs_files.append(pbs)
119
120
        return pbs_files
121
122
    def write_pbs_files(self, pbs_dir="./"):
123
        """ Writes PBS files allowing the execution of every commands on the given queue.
124
125
        Parameters
126
        ----------
127
        pbs_dir : str
128
            folder where to save pbs files
129
        """
130
        pbs_filenames = []
131
        for i, pbs in enumerate(self.pbs_list):
132
            pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh')
133
            pbs.save(pbs_filename)
134
            pbs_filenames.append(pbs_filename)
135
136
        return pbs_filenames
137
138
    def specify_account_name_from_env(self, environment_variable_name):
139
        if environment_variable_name not in os.environ:
140
            raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name))
141
142
        account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name)))
143
        for pbs in self.pbs_list:
144
            pbs.add_options(A=account_name)
145
146
    def specify_account_name_from_file(self, rapid_filename):
147
        if not os.path.isfile(rapid_filename):
148
            raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename))
149
150
        with open(rapid_filename, 'r') as rapid_file:
151
            account_name = rapid_file.read().strip()
152
153
        for pbs in self.pbs_list:
154
            pbs.add_options(A=account_name)
155
156
157
class MammouthJobGenerator(JobGenerator):
158
159
    def _add_cluster_specific_rules(self):
160
        if self.queue.name.endswith("@mp2"):
161
            for pbs in self.pbs_list:
162
                pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes'])
163
164
165
class HadesJobGenerator(JobGenerator):
166
167
    def _add_cluster_specific_rules(self):
168
        for pbs in self.pbs_list:
169
            gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1)
170
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes'])
171
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes'])
172
173
174
class GuilliminJobGenerator(JobGenerator):
175
176
    def _add_cluster_specific_rules(self):
177
        return self.specify_account_name_from_env('HOME_GROUP')
178
179
180
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6
181
class HeliosJobGenerator(JobGenerator):
182
183
    def _add_cluster_specific_rules(self):
184
        self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap"))
185
186
        for pbs in self.pbs_list:
187
            # Remove forbidden ppn option. Default is 2 cores per gpu.
188
            pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes'])
189
190
class SlurmJobGenerator(JobGenerator):
191
192
    def _add_cluster_specific_rules(self):
193
        for pbs in self.pbs_list:
194
            gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1)
195
            ppn = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']).group(1)
196
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes'])
197
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes'])
198
            pbs.add_resources(naccelerators=gpus)
199
            pbs.add_resources(ncpus=ppn)
200