Completed
Pull Request — master (#173)
by
unknown
27s
created

TestJobGenerator._test_add_sbatch_flags()   B

Complexity

Conditions 6

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
c 0
b 0
f 0
dl 0
loc 15
rs 8
1
from nose.tools import assert_true, assert_false, assert_equal, assert_raises
2
import os
3
import shutil
4
import tempfile
5
import unittest
6
7
try:
8
    from mock import patch
9
except ImportError:
10
    from unittest.mock import patch
11
12
from smartdispatch.queue import Queue
13
from smartdispatch.job_generator import JobGenerator, job_generator_factory
14
from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator
15
from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator
16
from smartdispatch.job_generator import SlurmJobGenerator
17
18
19
class TestJobGenerator(object):
20
    pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30']
21
    sbatch_flags = ['--qos=high', '--output=file.out', '-Cminmemory']
22
23
    def setUp(self):
24
        self.testing_dir = tempfile.mkdtemp()
25
        self.cluster_name = "skynet"
26
        self.name = "9000@hal"
27
        self.walltime = "10:00"
28
        self.cores = 42
29
        self.gpus = 42
30
        self.mem_per_node = 32
31
        self.modules = ["cuda", "python"]
32
33
        self.queue = Queue(self.name, self.cluster_name, self.walltime, self.cores, 0, self.mem_per_node, self.modules)
34
        self.queue_gpu = Queue(self.name, self.cluster_name, self.walltime, self.cores, self.gpus, self.mem_per_node, self.modules)
35 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
36
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
37
        self.prolog = ["echo prolog"]
38
        self.epilog = ["echo epilog"]
39
40
    def tearDown(self):
41
        shutil.rmtree(self.testing_dir)
42
43
    def test_generate_pbs(self):
44
        job_generator = JobGenerator(self.queue, self.commands, prolog=self.prolog, epilog=self.epilog)
45 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
46
        # Test nb_cores_per_command argument
47
        # Should needs one PBS file
48
        assert_equal(len(job_generator.pbs_list), 1)
49
        assert_equal(job_generator.pbs_list[0].commands, self.commands)
50
        assert_equal(job_generator.pbs_list[0].prolog, self.prolog)
51
        assert_equal(job_generator.pbs_list[0].epilog, self.epilog)
52
53 View Code Duplication
    def test_generate_pbs2_cpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
54
        # Should needs two PBS file
55
        command_params = {'nb_cores_per_command': self.cores // 2}
56
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
57
        assert_equal(len(job_generator.pbs_list), 2)
58
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
59
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
60
61
    def test_generate_pbs4_cpu(self):
62
        # Should needs four PBS file
63
        command_params = {'nb_cores_per_command': self.cores}
64
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
65
        assert_equal(len(job_generator.pbs_list), 4)
66
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
67 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
68
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
69
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
70
71
        # Test modules to load
72
        # Check if needed modules for this queue are included in the PBS file
73
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
74
75
    def test_generate_pbs2_gpu(self):
76 View Code Duplication
        # Test nb_gpus_per_command argument
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
77
        # Should needs two PBS file
78
        command_params = {'nb_gpus_per_command': self.gpus // 2}
79
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
80
        assert_equal(len(job_generator.pbs_list), 2)
81
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
82
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
83
84
    def test_generate_pbs4_gpu(self):
85
        # Should needs four PBS files
86
        command_params = {'nb_gpus_per_command': self.gpus}
87
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
88
        assert_equal(len(job_generator.pbs_list), 4)
89
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
90
91
        # Since queue has gpus it should be specified in PBS resource `nodes`
92
        assert_true('gpus' in job_generator.pbs_list[0].resources['nodes'])
93
94
        # Test modules to load
95
        # Check if needed modules for this queue are included in the PBS file
96
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
97
98
    def test_write_pbs_files(self):
99
        commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
100
        command_params = {'nb_cores_per_command': self.cores}
101
        job_generator = JobGenerator(self.queue, commands, command_params=command_params)
102
        filenames = job_generator.write_pbs_files(self.testing_dir)
103
        assert_equal(len(filenames), 4)
104
105
    def _test_add_pbs_flags(self, flags):
106
        job_generator = JobGenerator(self.queue, self.commands)
107
        job_generator.add_pbs_flags(flags)
108
109
        resources = []
110
        options = []
111
112
        for flag in flags:
113
            if flag.startswith('-l'):
114
                resources += [flag[:2] + ' ' + flag[2:]]
115
            elif flag.startswith('-'):
116
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
117
118
        for pbs in job_generator.pbs_list:
119
            pbs_str = pbs.__str__()
120
            for flag in resources:
121
                assert_equal(pbs_str.count(flag), 1)
122
                assert_equal(pbs_str.count(flag[:flag.find('=')]), 1)
123
            for flag in options:
124
                assert_equal(pbs_str.count(flag), 1)
125
126
    def test_add_pbs_flags(self):
127
        for flag in self.pbs_flags:
128
            yield self._test_add_pbs_flags, [flag]
129
130
        yield self._test_add_pbs_flags, self.pbs_flags
131
132
    def test_add_pbs_flags_invalid(self):
133
        assert_raises(ValueError, self._test_add_pbs_flags, 'weeee')
134
135
    def test_add_pbs_flags_invalid_resource(self):
136
        assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee')
137
138
    def _test_add_sbatch_flags(self, flags):
139
        job_generator = JobGenerator(self.queue, self.commands)
140
        job_generator.add_sbatch_flags(flags)
141
        options = []
142
143
        for flag in flags:
144
            if flag.startswith('--'):
145
                options += [flag]
146
            elif flag.startswith('-'):
147
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
148
149
        for pbs in job_generator.pbs_list:
150
            pbs_str = pbs.__str__()
151
            for flag in options:
152
                assert_equal(pbs_str.count(flag), 1)
153
154
    def test_add_sbatch_flags(self):
155
        for flag in self.sbatch_flags:
156
            yield self._test_add_sbatch_flags, [flag]
157
158
        yield self._test_add_sbatch_flags, [flag]
159
160
    def test_add_sbatch_flag_invalid(self):
161
        invalid_flags = ["--qos high", "gpu", "-lfeature=k80"]
162
        for flag in invalid_flags:
163
            assert_raises(ValueError, self._test_add_sbatch_flags, flag)
164
165
class TestGuilliminQueue(object):
166
167
    def setUp(self):
168
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
169
        self.queue = Queue("test", "guillimin", "00:01", 1, 1, 1)
170
171
        self.bak_env_home_group = os.environ.get('HOME_GROUP')
172
        if self.bak_env_home_group is not None:
173
            del os.environ['HOME_GROUP']
174
175
    def tearDown(self):
176
        if self.bak_env_home_group is not None:
177
            os.environ['HOME_GROUP'] = self.bak_env_home_group
178
179
    def test_generate_pbs_no_home(self):
180
        assert_raises(ValueError, GuilliminJobGenerator, self.queue, self.commands)
181
182
    def test_generate_pbs(self):
183
        os.environ['HOME_GROUP'] = "/path/to/group"
184
        job_generator = GuilliminJobGenerator(self.queue, self.commands)
185
        pbs = job_generator.pbs_list[0]
186
        assert_true("-A" in pbs.options)
187
        assert_true(pbs.options["-A"] == 'group')
188
189
190
class TestMammouthQueue(object):
191
192
    def setUp(self):
193
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
194
        self.queue = Queue("qtest@mp2", "mammouth")
195
196
    def test_generate_pbs(self):
197
        job_generator = MammouthJobGenerator(self.queue, self.commands)
198
199
        assert_true("ppn=1" in str(job_generator.pbs_list[0]))
200
201
202
class TestHeliosQueue(object):
203
204
    def setUp(self):
205
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
206
        self.queue = Queue("gpu_8", "helios")
207
208
        self._home_backup = os.environ['HOME']
209
        os.environ['HOME'] = tempfile.mkdtemp()
210
211
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
212
        if os.path.isfile(self.rap_filename):
213
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
214
        else:
215
            self.rapid = 'asd-123-ab'
216
            with open(self.rap_filename, 'w+') as rap_file:
217
                rap_file.write(self.rapid)
218
219
        self.job_generator = HeliosJobGenerator(self.queue, self.commands)
220
221
    def tearDown(self):
222
        shutil.rmtree(os.environ['HOME'])
223
        os.environ['HOME'] = self._home_backup
224
225
    def test_generate_pbs_invalid_group(self):
226
        os.remove(self.rap_filename)
227
228
        assert_raises(ValueError, HeliosJobGenerator, self.queue, self.commands)
229
230
    def test_generate_pbs_valid_group(self):
231
        pbs = self.job_generator.pbs_list[0]
232
233
        assert_equal(pbs.options['-A'], self.rapid)
234
235
    def test_generate_pbs_ppn_is_absent(self):
236
        assert_false("ppn=" in str(self.job_generator.pbs_list[0]))
237
238
    def test_generate_pbs_even_nb_commands(self):
239
        assert_true("gpus=4" in str(self.job_generator.pbs_list[0]))
240
241
    def test_generate_pbs_odd_nb_commands(self):
242
        commands = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5"]
243
        job_generator = HeliosJobGenerator(self.queue, commands)
244
245
        assert_true("gpus=5" in str(job_generator.pbs_list[0]))
246
247
248
class TestHadesQueue(object):
249
250
    def setUp(self):
251
        self.queue = Queue("@hades", "hades")
252
253
        self.commands4 = ["echo 1", "echo 2", "echo 3", "echo 4"]
254
        job_generator = HadesJobGenerator(self.queue, self.commands4)
255
        self.pbs4 = job_generator.pbs_list
256
257
        # 8 commands chosen because there is 8 cores but still should be split because there is 6 gpu
258
        self.commands8 = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5", "echo 6", "echo 7", "echo 8"]
259
        job_generator = HadesJobGenerator(self.queue, self.commands8)
260
        self.pbs8 = job_generator.pbs_list
261
262
    def test_generate_pbs_ppn(self):
263
        assert_true("ppn={}".format(len(self.commands4)) in str(self.pbs4[0]))
264
265
    def test_generate_pbs_no_gpus_used(self):
266
        # Hades use ppn instead og the gpus flag and breaks if gpus is there
267
        assert_false("gpus=" in str(self.pbs4[0]))
268
269
    def test_pbs_split_1_job(self):
270
        assert_equal(len(self.pbs4), 1)
271
272
    def test_pbs_split_2_job(self):
273
        assert_equal(len(self.pbs8), 2)
274
275
    def test_pbs_split_2_job_nb_commands(self):
276
        assert_true("ppn=6" in str(self.pbs8[0]))
277
        assert_true("ppn=2" in str(self.pbs8[1]))
278
279
280
class TestSlurmQueue(unittest.TestCase):
281
282
    def setUp(self):
283
        self.walltime = "10:00"
284
        self.cores = 42
285
        self.mem_per_node = 32
286
        self.nb_cores_per_node = 1
287
        self.nb_gpus_per_node = 2
288
        self.queue = Queue("slurm", "mila", self.walltime, self.nb_cores_per_node, self.nb_gpus_per_node, self.mem_per_node)
289
290
        self.nb_of_commands = 4
291
        self.commands = ["echo %d; echo $PBS_JOBID; echo $PBS_WALLTIME" % i
292
                         for i in range(self.nb_of_commands)]
293
294
        self.prolog = ["echo prolog"]
295
        self.epilog = ["echo $PBS_FILENAME"]
296
        job_generator = SlurmJobGenerator(
297
            self.queue, self.commands, prolog=self.prolog, epilog=self.epilog)
298
        self.pbs = job_generator.pbs_list
299
300
        with patch.object(SlurmJobGenerator,'_add_cluster_specific_rules', side_effect=lambda: None):
301
            dummy_generator = SlurmJobGenerator(
302
                self.queue, self.commands, prolog=self.prolog, epilog=self.epilog)
303
            self.dummy_pbs = dummy_generator.pbs_list
304
305
    def test_ppn_ncpus(self):
306
        assert_true("ppn" in str(self.dummy_pbs[0]))
307
        assert_true("ncpus" not in str(self.dummy_pbs[0]))
308
        assert_true("ppn" not in str(self.pbs[0]))
309
        assert_true("ncpus" in str(self.pbs[0]))
310
311
    def test_gpus_naccelerators(self):
312
        assert_true("gpus" in str(self.dummy_pbs[0]))
313
        assert_true("naccelerators" not in str(self.dummy_pbs[0]))
314
        assert_true("gpus" not in str(self.pbs[0]))
315
        assert_true("naccelerators" in str(self.pbs[0]))
316
317
    def test_queue(self):
318
        assert_true("PBS -q" in str(self.dummy_pbs[0]))
319
        assert_true("PBS -q" not in str(self.pbs[0]))
320
321
    def test_export(self):
322
        assert_true("#PBS -V" in str(self.dummy_pbs[0]))
323
        assert_true("#PBS -V" not in str(self.pbs[0]))
324
        assert_true("#SBATCH --export=ALL" in str(self.pbs[0]))
325
326
    def test_outputs(self):
327
        for std in ['-e', '-o']:
328
            value = self.dummy_pbs[0].options[std]
329
            assert_true("$PBS_JOBID" in value, 
330
                        "$PBS_JOBID should be present in option %s: %s" %
331
                        (std, value))
332
333
            value = self.pbs[0].options[std]
334
            assert_true("$PBS_JOBID" not in value, 
335
                        "$PBS_JOBID not should be present in option %s: %s" %
336
                        (std, value))
337
            assert_true("%A" in value, 
338
                        "%%A should be present in option %s: %s" %
339
                        (std, value))
340
341
    def test_job_id_env_var(self):
342
        self.assertIn("$PBS_JOBID", str(self.dummy_pbs[0]))
343
        self.assertNotIn("$SLURM_JOB_ID", str(self.dummy_pbs[0])) 
344
345
        self.assertNotIn("$PBS_JOBID", str(self.pbs[0]))
346
        self.assertIn("$SLURM_JOB_ID", str(self.pbs[0]))
347
348
    def test_walltime_env_var(self):
349
        self.assertIn("$PBS_WALLTIME", str(self.dummy_pbs[0]))
350
        self.assertNotIn("$SBATCH_TIMELIMIT", str(self.dummy_pbs[0]))
351
352
        self.assertNotIn("$PBS_WALLTIME", str(self.pbs[0]))
353
        self.assertIn("$SBATCH_TIMELIMIT", str(self.pbs[0]))
354
355
        self.assertNotIn("SBATCH_TIMELIMIT=",
356
                    "\n".join(self.dummy_pbs[0].prolog))
357
        self.assertIn("SBATCH_TIMELIMIT=",
358
                    "\n".join(self.pbs[0].prolog))
359
360
361
class TestJobGeneratorFactory(object):
362
363
    def setUp(self):
364
        self._home_backup = os.environ['HOME']
365
        os.environ['HOME'] = tempfile.mkdtemp()
366
367
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
368
        if os.path.isfile(self.rap_filename):
369
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
370
        else:
371
            self.rapid = 'asd-123-ab'
372
            with open(self.rap_filename, 'w+') as rap_file:
373
                rap_file.write(self.rapid)
374
375
    def tearDown(self):
376
        shutil.rmtree(os.environ['HOME'])
377
        os.environ['HOME'] = self._home_backup
378
379
    def _test_job_generator_factory(self, cluster_name, job_generator_class):
380
        q = Queue("test", cluster_name, 1, 1, 1, 1)
381
        job_generator = job_generator_factory(q, [], cluster_name=cluster_name)
382
        assert_true(isinstance(job_generator, job_generator_class))
383
        assert_true(type(job_generator) is job_generator_class)
384
385
    def test_job_generator_factory(self):
386
        clusters = [("guillimin", GuilliminJobGenerator),
387
                    ("mammouth", MammouthJobGenerator),
388
                    ("helios", HeliosJobGenerator),
389
                    ("hades", HadesJobGenerator),
390
                    (None, JobGenerator)]
391
392
        for cluster_name, job_generator_class in clusters:
393
            yield self._test_job_generator_factory, cluster_name, job_generator_class
394