Completed
Pull Request — master (#153)
by
unknown
58s
created

JobGenerator.specify_account_name_from_env()   A

Complexity

Conditions 3

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 3
dl 0
loc 7
rs 9.4285
c 2
b 0
f 1
1
from __future__ import absolute_import
2
3
import os
4
import re
5
from smartdispatch.pbs import PBS
6
from smartdispatch import utils
7
8
9
def job_generator_factory(queue, commands, command_params={}, cluster_name=None, base_path="./"):
10
    if cluster_name == "guillimin":
11
        return GuilliminJobGenerator(queue, commands, command_params, base_path)
12
    elif cluster_name == "mammouth":
13
        return MammouthJobGenerator(queue, commands, command_params, base_path)
14
    elif cluster_name == "helios":
15
        return HeliosJobGenerator(queue, commands, command_params, base_path)
16
    elif cluster_name == "hades":
17
        return HadesJobGenerator(queue, commands, command_params, base_path)
18
19
    return JobGenerator(queue, commands, command_params, base_path)
20
21
22
class JobGenerator(object):
23
24
    """ Offers functionalities to generate PBS files for a given queue.
25
26
    Parameters
27
    ----------
28
    queue : `Queue` instance
29
        queue on which commands will be executed
30
    commands : list of str
31
        commands to put in PBS files
32
    command_params : dict
33
        information about the commands
34
    """
35
36
    def __init__(self, queue, commands, command_params={}, base_path="./"):
37
        self.commands = commands
38
        self.queue = queue
39
        self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path)
40
41
        self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1)
42
        self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1)
43
        self.mem_per_command = command_params.get('mem_per_command', None)
44
45
        self.pbs_list = self._generate_base_pbs()
46
        self._add_cluster_specific_rules()
47
48
    def _add_cluster_specific_rules(self):
49
        pass
50
51
    def add_pbs_flags(self, flags):
52
        resources = {}
53
        options = {}
54
55
        for flag in flags:
56
            flag = flag
57
            if flag.startswith('-l'):
58
                resource = flag[2:]
59
                split = resource.find('=')
60
                resources[resource[:split]] = resource[split+1:]
61
            elif flag.startswith('-'):
62
                options[flag[1:2]] = flag[2:]
63
            else:
64
                raise ValueError("Invalid PBS flag ({})".format(flag))
65
66
        for pbs in self.pbs_list:
67
            pbs.add_resources(**resources)
68
            pbs.add_options(**options)
69
70
    def _generate_base_pbs(self):
71
        """ Generates PBS files allowing the execution of every commands on the given queue. """
72
        nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command
73
74
        if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0:
75
            nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command)
76
77
        # Limit number of running commands by the amount of available memory on the node.
78
        if self.mem_per_command is not None:
79
            nb_commands_per_node = min(nb_commands_per_node, self.queue.mem_per_node // self.mem_per_command)
80
            mem_per_command = self.mem_per_command
81
        else:
82
            mem_per_command = self.queue.mem_per_node // nb_commands_per_node
83
84
        pbs_files = []
85
        # Distribute equally the jobs among the PBS files and generate those files
86
        for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)):
87
            pbs = PBS(self.queue.name, self.queue.walltime)
88
89
            # TODO Move the add_options into the JobManager once created.
90
            pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err'))
91
92
            # Set resource: nodes
93
            resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command)
94
            if self.queue.nb_gpus_per_node > 0:
95
                resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command)
96
            pbs.add_resources(nodes=resource)
97
98
            if mem_per_command is not None:
99
                resource = "{mem}Gb".format(mem=len(commands) * mem_per_command)
100
                pbs.add_resources(mem=resource)
101
102
            pbs.add_modules_to_load(*self.queue.modules)
103
            pbs.add_commands(*commands)
104
105
            pbs_files.append(pbs)
106
107
        return pbs_files
108
109
    def write_pbs_files(self, pbs_dir="./"):
110
        """ Writes PBS files allowing the execution of every commands on the given queue.
111
112
        Parameters
113
        ----------
114
        pbs_dir : str
115
            folder where to save pbs files
116
        """
117
        pbs_filenames = []
118
        for i, pbs in enumerate(self.pbs_list):
119
            pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh')
120
            pbs.save(pbs_filename)
121
            pbs_filenames.append(pbs_filename)
122
123
        return pbs_filenames
124
125
    def specify_account_name_from_env(self, environment_variable_name):
126
        if environment_variable_name not in os.environ:
127
            raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name))
128
129
        account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name)))
130
        for pbs in self.pbs_list:
131
            pbs.add_options(A=account_name)
132
133
    def specify_account_name_from_file(self, rapid_filename):
134
        if not os.path.isfile(rapid_filename):
135
            raise ValueError("Account name file {} does not exist. Please, provide your account name!".format(rapid_filename))
136
137
        with open(rapid_filename, 'r') as rapid_file:
138
            account_name = rapid_file.read().strip()
139
140
        for pbs in self.pbs_list:
141
            pbs.add_options(A=account_name)
142
143
144
class MammouthJobGenerator(JobGenerator):
145
146
    def _add_cluster_specific_rules(self):
147
        if self.queue.name.endswith("@mp2"):
148
            for pbs in self.pbs_list:
149
                pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes'])
150
151
152
class HadesJobGenerator(JobGenerator):
153
154
    def _add_cluster_specific_rules(self):
155
        for pbs in self.pbs_list:
156
            gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1)
157
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes'])
158
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes'])
159
160
161
class GuilliminJobGenerator(JobGenerator):
162
163
    def _add_cluster_specific_rules(self):
164
        return self.specify_account_name_from_env('HOME_GROUP')
165
166
167
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6
168
class HeliosJobGenerator(JobGenerator):
169
170
    def _add_cluster_specific_rules(self):
171
        self.specify_account_name_from_file(os.path.join(os.environ['HOME'], ".default_rap"))
172
173
        for pbs in self.pbs_list:
174
            # Remove forbidden ppn option. Default is 2 cores per gpu.
175
            pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes'])
176