Completed
Pull Request — master (#166)
by
unknown
24s
created

TestJobGenerator.test_add_pbs_flags_invalid()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 0
f 0
1
from nose.tools import assert_true, assert_false, assert_equal, assert_raises
2
import unittest
3
4
import os
5
import tempfile
6
import shutil
7
from smartdispatch.queue import Queue
8
from smartdispatch.job_generator import JobGenerator, job_generator_factory
9
from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator
10
from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator
11
from smartdispatch.job_generator import SlurmJobGenerator
12
13
14
class TestJobGenerator(object):
15
    pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30']
16
    sbatch_flags = ['--qos=high', '--output=file.out', '-Cminmemory']
17
18
    def setUp(self):
19
        self.testing_dir = tempfile.mkdtemp()
20
        self.cluster_name = "skynet"
21
        self.name = "9000@hal"
22
        self.walltime = "10:00"
23
        self.cores = 42
24
        self.gpus = 42
25
        self.mem_per_node = 32
26
        self.modules = ["cuda", "python"]
27
28
        self.queue = Queue(self.name, self.cluster_name, self.walltime, self.cores, 0, self.mem_per_node, self.modules)
29
        self.queue_gpu = Queue(self.name, self.cluster_name, self.walltime, self.cores, self.gpus, self.mem_per_node, self.modules)
30
31
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
32
        self.prolog = ["echo prolog"]
33
        self.epilog = ["echo epilog"]
34
35 View Code Duplication
    def tearDown(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
36
        shutil.rmtree(self.testing_dir)
37
38
    def test_generate_pbs(self):
39
        job_generator = JobGenerator(self.queue, self.commands, prolog=self.prolog, epilog=self.epilog)
40
41
        # Test nb_cores_per_command argument
42
        # Should needs one PBS file
43
        assert_equal(len(job_generator.pbs_list), 1)
44
        assert_equal(job_generator.pbs_list[0].commands, self.commands)
45 View Code Duplication
        assert_equal(job_generator.pbs_list[0].prolog, self.prolog)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
46
        assert_equal(job_generator.pbs_list[0].epilog, self.epilog)
47
48
    def test_generate_pbs2_cpu(self):
49
        # Should needs two PBS file
50
        command_params = {'nb_cores_per_command': self.cores // 2}
51
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
52
        assert_equal(len(job_generator.pbs_list), 2)
53 View Code Duplication
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
54
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
55
56
    def test_generate_pbs4_cpu(self):
57
        # Should needs four PBS file
58
        command_params = {'nb_cores_per_command': self.cores}
59
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
60
        assert_equal(len(job_generator.pbs_list), 4)
61
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
62
63
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
64
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
65
66
        # Test modules to load
67 View Code Duplication
        # Check if needed modules for this queue are included in the PBS file
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
68
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
69
70
    def test_generate_pbs2_gpu(self):
71
        # Test nb_gpus_per_command argument
72
        # Should needs two PBS file
73
        command_params = {'nb_gpus_per_command': self.gpus // 2}
74
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
75
        assert_equal(len(job_generator.pbs_list), 2)
76 View Code Duplication
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
77
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
78
79
    def test_generate_pbs4_gpu(self):
80
        # Should needs four PBS files
81
        command_params = {'nb_gpus_per_command': self.gpus}
82
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
83
        assert_equal(len(job_generator.pbs_list), 4)
84
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
85
86
        # Since queue has gpus it should be specified in PBS resource `nodes`
87
        assert_true('gpus' in job_generator.pbs_list[0].resources['nodes'])
88
89
        # Test modules to load
90
        # Check if needed modules for this queue are included in the PBS file
91
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
92
93
    def test_write_pbs_files(self):
94
        commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
95
        command_params = {'nb_cores_per_command': self.cores}
96
        job_generator = JobGenerator(self.queue, commands, command_params=command_params)
97
        filenames = job_generator.write_pbs_files(self.testing_dir)
98
        assert_equal(len(filenames), 4)
99
100
    def _test_add_pbs_flags(self, flags):
101
        job_generator = JobGenerator(self.queue, self.commands)
102
        job_generator.add_pbs_flags(flags)
103
104
        resources = []
105
        options = []
106
107
        for flag in flags:
108
            if flag.startswith('-l'):
109
                resources += [flag[:2] + ' ' + flag[2:]]
110
            elif flag.startswith('-'):
111
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
112
113
        for pbs in job_generator.pbs_list:
114
            pbs_str = pbs.__str__()
115
            for flag in resources:
116
                assert_equal(pbs_str.count(flag), 1)
117
                assert_equal(pbs_str.count(flag[:flag.find('=')]), 1)
118
            for flag in options:
119
                assert_equal(pbs_str.count(flag), 1)
120
121
    def test_add_pbs_flags(self):
122
        for flag in self.pbs_flags:
123
            yield self._test_add_pbs_flags, [flag]
124
125
        yield self._test_add_pbs_flags, self.pbs_flags
126
127
    def test_add_pbs_flags_invalid(self):
128
        assert_raises(ValueError, self._test_add_pbs_flags, 'weeee')
129
130
    def test_add_pbs_flags_invalid_resource(self):
131
        assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee')
132
133
    def _test_add_sbatch_flags(self, flags):
134
        job_generator = JobGenerator(self.queue, self.commands)
135
        job_generator.add_sbatch_flags(flags)
136
        options = []
137
138
        for flag in flags:
139
            if flag.startswith('--'):
140
                options += [flag]
141
            elif flag.startswith('-'):
142
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
143
144
        for pbs in job_generator.pbs_list:
145
            pbs_str = pbs.__str__()
146
            for flag in options:
147
                assert_equal(pbs_str.count(flag), 1)
148
149
    def test_add_sbatch_flags(self):
150
        for flag in self.sbatch_flags:
151
            yield self._test_add_sbatch_flags, [flag]
152
153
        yield self._test_add_sbatch_flags, [flag]
154
155
    def test_add_sbatch_flag_invalid(self):
156
        invalid_flags = ["--qos high", "gpu", "-lfeature=k80"]
157
        for flag in invalid_flags:
158
            assert_raises(ValueError, self._test_add_sbatch_flags, "--qos high")
159
160
class TestGuilliminQueue(object):
161
162
    def setUp(self):
163
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
164
        self.queue = Queue("test", "guillimin", "00:01", 1, 1, 1)
165
166
        self.bak_env_home_group = os.environ.get('HOME_GROUP')
167
        if self.bak_env_home_group is not None:
168
            del os.environ['HOME_GROUP']
169
170
    def tearDown(self):
171
        if self.bak_env_home_group is not None:
172
            os.environ['HOME_GROUP'] = self.bak_env_home_group
173
174
    def test_generate_pbs_no_home(self):
175
        assert_raises(ValueError, GuilliminJobGenerator, self.queue, self.commands)
176
177
    def test_generate_pbs(self):
178
        os.environ['HOME_GROUP'] = "/path/to/group"
179
        job_generator = GuilliminJobGenerator(self.queue, self.commands)
180
        pbs = job_generator.pbs_list[0]
181
        assert_true("-A" in pbs.options)
182
        assert_true(pbs.options["-A"] == 'group')
183
184
185
class TestMammouthQueue(object):
186
187
    def setUp(self):
188
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
189
        self.queue = Queue("qtest@mp2", "mammouth")
190
191
    def test_generate_pbs(self):
192
        job_generator = MammouthJobGenerator(self.queue, self.commands)
193
194
        assert_true("ppn=1" in str(job_generator.pbs_list[0]))
195
196
197
class TestHeliosQueue(object):
198
199
    def setUp(self):
200
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
201
        self.queue = Queue("gpu_8", "helios")
202
203
        self._home_backup = os.environ['HOME']
204
        os.environ['HOME'] = tempfile.mkdtemp()
205
206
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
207
        if os.path.isfile(self.rap_filename):
208
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
209
        else:
210
            self.rapid = 'asd-123-ab'
211
            with open(self.rap_filename, 'w+') as rap_file:
212
                rap_file.write(self.rapid)
213
214
        self.job_generator = HeliosJobGenerator(self.queue, self.commands)
215
216
    def tearDown(self):
217
        shutil.rmtree(os.environ['HOME'])
218
        os.environ['HOME'] = self._home_backup
219
220
    def test_generate_pbs_invalid_group(self):
221
        os.remove(self.rap_filename)
222
223
        assert_raises(ValueError, HeliosJobGenerator, self.queue, self.commands)
224
225
    def test_generate_pbs_valid_group(self):
226
        pbs = self.job_generator.pbs_list[0]
227
228
        assert_equal(pbs.options['-A'], self.rapid)
229
230
    def test_generate_pbs_ppn_is_absent(self):
231
        assert_false("ppn=" in str(self.job_generator.pbs_list[0]))
232
233
    def test_generate_pbs_even_nb_commands(self):
234
        assert_true("gpus=4" in str(self.job_generator.pbs_list[0]))
235
236
    def test_generate_pbs_odd_nb_commands(self):
237
        commands = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5"]
238
        job_generator = HeliosJobGenerator(self.queue, commands)
239
240
        assert_true("gpus=5" in str(job_generator.pbs_list[0]))
241
242
243
class TestHadesQueue(object):
244
245
    def setUp(self):
246
        self.queue = Queue("@hades", "hades")
247
248
        self.commands4 = ["echo 1", "echo 2", "echo 3", "echo 4"]
249
        job_generator = HadesJobGenerator(self.queue, self.commands4)
250
        self.pbs4 = job_generator.pbs_list
251
252
        # 8 commands chosen because there is 8 cores but still should be split because there is 6 gpu
253
        self.commands8 = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5", "echo 6", "echo 7", "echo 8"]
254
        job_generator = HadesJobGenerator(self.queue, self.commands8)
255
        self.pbs8 = job_generator.pbs_list
256
257
    def test_generate_pbs_ppn(self):
258
        assert_true("ppn={}".format(len(self.commands4)) in str(self.pbs4[0]))
259
260
    def test_generate_pbs_no_gpus_used(self):
261
        # Hades use ppn instead og the gpus flag and breaks if gpus is there
262
        assert_false("gpus=" in str(self.pbs4[0]))
263
264
    def test_pbs_split_1_job(self):
265
        assert_equal(len(self.pbs4), 1)
266
267
    def test_pbs_split_2_job(self):
268
        assert_equal(len(self.pbs8), 2)
269
270
    def test_pbs_split_2_job_nb_commands(self):
271
        assert_true("ppn=6" in str(self.pbs8[0]))
272
        assert_true("ppn=2" in str(self.pbs8[1]))
273
274
class TestSlurmQueue(object):
275
276
    def setUp(self):
277
        self.walltime = "10:00"
278
        self.cores = 42
279
        self.mem_per_node = 32
280
        self.nb_cores_per_node = 1
281
        self.nb_gpus_per_node = 2
282
        self.queue = Queue("slurm", "mila", self.walltime, self.nb_cores_per_node, self.nb_gpus_per_node, self.mem_per_node)
283
284
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
285
        job_generator = SlurmJobGenerator(self.queue, self.commands)
286
        self.pbs = job_generator.pbs_list
287
288
    def test_ppn_ncpus(self):
289
        assert_true("ppn" not in str(self.pbs[0]))
290
        assert_true("ncpus" in str(self.pbs[0]))
291
292
    def test_gpus_naccelerators(self):
293
        assert_true("gpus" not in str(self.pbs[0]))
294
        assert_true("naccelerators" in str(self.pbs[0]))
295
296
class TestJobGeneratorFactory(object):
297
298
    def setUp(self):
299
        self._home_backup = os.environ['HOME']
300
        os.environ['HOME'] = tempfile.mkdtemp()
301
302
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
303
        if os.path.isfile(self.rap_filename):
304
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
305
        else:
306
            self.rapid = 'asd-123-ab'
307
            with open(self.rap_filename, 'w+') as rap_file:
308
                rap_file.write(self.rapid)
309
310
    def tearDown(self):
311
        shutil.rmtree(os.environ['HOME'])
312
        os.environ['HOME'] = self._home_backup
313
314
    def _test_job_generator_factory(self, cluster_name, job_generator_class):
315
        q = Queue("test", cluster_name, 1, 1, 1, 1)
316
        job_generator = job_generator_factory(q, [], cluster_name=cluster_name)
317
        assert_true(isinstance(job_generator, job_generator_class))
318
        assert_true(type(job_generator) is job_generator_class)
319
320
    def test_job_generator_factory(self):
321
        clusters = [("guillimin", GuilliminJobGenerator),
322
                    ("mammouth", MammouthJobGenerator),
323
                    ("helios", HeliosJobGenerator),
324
                    ("hades", HadesJobGenerator),
325
                    (None, JobGenerator)]
326
327
        for cluster_name, job_generator_class in clusters:
328
            yield self._test_job_generator_factory, cluster_name, job_generator_class
329