Completed
Pull Request — master (#103)
by Mathieu
01:01
created

smartdispatch.HadesJobGenerator.generate_pbs()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 9
rs 9.6666
1
from __future__ import absolute_import
2
3
import os
4
import re
5
from smartdispatch.pbs import PBS
6
from smartdispatch import utils
7
8
9
def job_generator_factory(queue, commands, command_params={}, cluster_name=None, base_path="./"):
10
    if cluster_name == "guillimin":
11
        return GuilliminJobGenerator(queue, commands, command_params, base_path)
12
    elif cluster_name == "mammouth":
13
        return MammouthJobGenerator(queue, commands, command_params, base_path)
14
    elif cluster_name == "helios":
15
        return HeliosJobGenerator(queue, commands, command_params, base_path)
16
    elif cluster_name == "hades":
17
        return HadesJobGenerator(queue, commands, command_params, base_path)
18
19
    return JobGenerator(queue, commands, command_params, base_path)
20
21
22
class JobGenerator(object):
23
24
    """ Offers functionalities to generate PBS files for a given queue.
25
26
    Parameters
27
    ----------
28
    queue : `Queue` instance
29
        queue on which commands will be executed
30
    commands : list of str
31
        commands to put in PBS files
32
    command_params : dict
33
        information about the commands
34
    """
35
36
    def __init__(self, queue, commands, command_params={}, base_path="./"):
37
        self.commands = commands
38
        self.queue = queue
39
        self.job_log_filename = '"{base_path}/logs/job/"$PBS_JOBID".{{ext}}"'.format(base_path=base_path)
40
41
        self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1)
42
        self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1)
43
        #self.mem_per_command = command_params.get('mem_per_command', 0.0)
44
45
    def generate_pbs(self):
46
        """ Generates PBS files allowing the execution of every commands on the given queue. """
47
        nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command
48
49
        if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0:
50
            nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command)
51
52
        pbs_files = []
53
        # Distribute equally the jobs among the PBS files and generate those files
54
        for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)):
55
            pbs = PBS(self.queue.name, self.queue.walltime)
56
57
            # TODO Move the add_options into the JobManager once created.
58
            pbs.add_options(o=self.job_log_filename.format(ext='out'), e=self.job_log_filename.format(ext='err'))
59
60
            # Set resource: nodes
61
            resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command)
62
            if self.queue.nb_gpus_per_node > 0:
63
                resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command)
64
65
            pbs.add_resources(nodes=resource)
66
67
            pbs.add_modules_to_load(*self.queue.modules)
68
            pbs.add_commands(*commands)
69
70
            pbs_files.append(pbs)
71
72
        return pbs_files
73
74
    def write_pbs_files(self, pbs_dir="./"):
75
        """ Writes PBS files allowing the execution of every commands on the given queue.
76
77
        Parameters
78
        ----------
79
        pbs_dir : str
80
            folder where to save pbs files
81
        """
82
        pbs_list = self.generate_pbs()
83
        pbs_filenames = []
84
        for i, pbs in enumerate(pbs_list):
85
            pbs_filename = os.path.join(pbs_dir, 'job_commands_' + str(i) + '.sh')
86
            pbs.save(pbs_filename)
87
            pbs_filenames.append(pbs_filename)
88
89
        return pbs_filenames
90
91
    def generate_pbs_with_account_name_from_env(self, environment_variable_name):
92
        pbs_list = JobGenerator.generate_pbs(self)
93
94
        if environment_variable_name not in os.environ:
95
            raise ValueError("Undefined environment variable: ${}. Please, provide your account name!".format(environment_variable_name))
96
97
        account_name = os.path.basename(os.path.realpath(os.getenv(environment_variable_name)))
98
        for pbs in pbs_list:
99
            pbs.add_options(A=account_name)
100
101
        return pbs_list
102
103
104
class MammouthJobGenerator(JobGenerator):
105
106
    def generate_pbs(self):
107
        pbs_list = JobGenerator.generate_pbs(self)
108
109
        if self.queue.name.endswith("@mp2"):
110
            for pbs in pbs_list:
111
                pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn=1", pbs.resources['nodes'])
112
113
        return pbs_list
114
115
116
class HadesJobGenerator(JobGenerator):
117
118
    def generate_pbs(self):
119
        pbs_list = JobGenerator.generate_pbs(self)
120
121
        for pbs in pbs_list:
122
            gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1)
123
            pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "ppn={}".format(gpus), pbs.resources['nodes'])
124
            pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes'])
125
126
        return pbs_list
127
128
129
class GuilliminJobGenerator(JobGenerator):
130
131
    def generate_pbs(self):
132
        return self.generate_pbs_with_account_name_from_env('HOME_GROUP')
133
134
135
# https://wiki.calculquebec.ca/w/Ex%C3%A9cuter_une_t%C3%A2che#tab=tab6
136
class HeliosJobGenerator(JobGenerator):
137
138
    def generate_pbs(self):
139
        pbs_list = self.generate_pbs_with_account_name_from_env('RAP')
140
141
        for pbs in pbs_list:
142
            # Remove forbidden ppn option. Default is 5 cores per 2 gpu.
143
            pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes'])
144
145
            # Nb of GPUs has to be a multiple of 2
146
            nb_gpus = int(re.findall("gpus=([0-9]+)", pbs.resources['nodes'])[0])
147
            if nb_gpus % 2 != 0:
148
                pbs.resources['nodes'] = re.sub("gpus=[0-9]+", "gpus={0}".format(nb_gpus + 1), pbs.resources['nodes'])
149
150
        return pbs_list
151