Completed
Pull Request — master (#153)
by
unknown
01:05
created

TestJobGenerator.setUp()   A

Complexity

Conditions 1

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 16
rs 9.4285
1
from nose.tools import assert_true, assert_false, assert_equal, assert_raises
2
3
import os
4
import tempfile
5
import shutil
6
from smartdispatch.queue import Queue
7
from smartdispatch.job_generator import JobGenerator, job_generator_factory
8
from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator
9
from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator
10
11
12
class TestJobGenerator(object):
13
    pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30']
14
15
    def setUp(self):
16
        self.testing_dir = tempfile.mkdtemp()
17
        self.cluster_name = "skynet"
18
        self.name = "9000@hal"
19
        self.walltime = "10:00"
20
        self.cores = 42
21
        self.gpus = 42
22
        self.mem_per_node = 32
23
        self.modules = ["cuda", "python"]
24
25
        self.queue = Queue(self.name, self.cluster_name, self.walltime, self.cores, 0, self.mem_per_node, self.modules)
26
        self.queue_gpu = Queue(self.name, self.cluster_name, self.walltime, self.cores, self.gpus, self.mem_per_node, self.modules)
27
28
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
29
        self.prolog = ["echo prolog"]
30
        self.epilog = ["echo epilog"]
31
32
    def tearDown(self):
33
        shutil.rmtree(self.testing_dir)
34
35 View Code Duplication
    def test_generate_pbs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
36
        job_generator = JobGenerator(self.queue, self.commands, prolog=self.prolog, epilog=self.epilog)
37
38
        # Test nb_cores_per_command argument
39
        # Should needs one PBS file
40
        assert_equal(len(job_generator.pbs_list), 1)
41
        assert_equal(job_generator.pbs_list[0].commands, self.commands)
42
        assert_equal(job_generator.pbs_list[0].prolog, self.prolog)
43
        assert_equal(job_generator.pbs_list[0].epilog, self.epilog)
44
45 View Code Duplication
    def test_generate_pbs2_cpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
46
        # Should need two PBS files
47
        command_params = {'nb_cores_per_command': self.cores // 2}
48
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
49
        assert_equal(len(job_generator.pbs_list), 2)
50
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
51
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
52
53 View Code Duplication
    def test_generate_pbs4_cpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
54
        # Should need four PBS files
55
        command_params = {'nb_cores_per_command': self.cores}
56
        job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
57
        assert_equal(len(job_generator.pbs_list), 4)
58
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
59
60
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
61
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
62
63
        # Test modules to load
64
        # Check if needed modules for this queue are included in the PBS file
65
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
66
67 View Code Duplication
    def test_generate_pbs2_mem(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
68
        # Should need two PBS files
69
        command_params = {'mem_per_command': self.mem_per_node // 2}
70
        job_generator = JobGenerator(self.queue, self.commands, command_params)
71
        assert_equal(len(job_generator.pbs_list), 2)
72
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
73
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
74
75
    def test_generate_pbs4_mem(self):
76 View Code Duplication
        # Should need four PBS files
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
77
        command_params = {'mem_per_command': self.mem_per_node}
78
        job_generator = JobGenerator(self.queue, self.commands, command_params)
79
        assert_equal(len(job_generator.pbs_list), 4)
80
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
81
82
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
83
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
84
85
        # Test modules to load
86
        # Check if needed modules for this queue are included in the PBS file
87
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
88
89
    def test_generate_pbs2_gpu(self):
90
        # Test nb_gpus_per_command argument
91
        # Should need two PBS files
92
        command_params = {'nb_gpus_per_command': self.gpus // 2}
93
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
94
        assert_equal(len(job_generator.pbs_list), 2)
95
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
96
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
97
98
    def test_generate_pbs4_gpu(self):
99
        # Should need four PBS files
100
        command_params = {'nb_gpus_per_command': self.gpus}
101
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
102
        assert_equal(len(job_generator.pbs_list), 4)
103
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
104
105
        # Since queue has gpus it should be specified in PBS resource `nodes`
106
        assert_true('gpus' in job_generator.pbs_list[0].resources['nodes'])
107
108
        # Test modules to load
109
        # Check if needed modules for this queue are included in the PBS file
110
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
111
112
    def test_write_pbs_files(self):
113
        commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
114
        command_params = {'nb_cores_per_command': self.cores}
115
        job_generator = JobGenerator(self.queue, commands, command_params=command_params)
116
        filenames = job_generator.write_pbs_files(self.testing_dir)
117
        assert_equal(len(filenames), 4)
118
119
    def _test_add_pbs_flags(self, flags):
120
        job_generator = JobGenerator(self.queue, self.commands)
121
        job_generator.add_pbs_flags(flags)
122
123
        resources = []
124
        options = []
125
126
        for flag in flags:
127
            if flag.startswith('-l'):
128
                resources += [flag[:2] + ' ' + flag[2:]]
129
            elif flag.startswith('-'):
130
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
131
132
        for pbs in job_generator.pbs_list:
133
            pbs_str = pbs.__str__()
134
            for flag in resources:
135
                assert_equal(pbs_str.count(flag), 1)
136
                assert_equal(pbs_str.count(flag[:flag.find('=')]), 1)
137
            for flag in options:
138
                assert_equal(pbs_str.count(flag), 1)
139
140
    def test_add_pbs_flags(self):
141
        for flag in self.pbs_flags:
142
            yield self._test_add_pbs_flags, [flag]
143
144
        yield self._test_add_pbs_flags, self.pbs_flags
145
146
    def test_add_pbs_flags_invalid(self):
147
        assert_raises(ValueError, self._test_add_pbs_flags, 'weeee')
148
149
    def test_add_pbs_flags_invalid_resource(self):
150
        assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee')
151
152
153
class TestGuilliminQueue(object):
154
155
    def setUp(self):
156
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
157
        self.queue = Queue("test", "guillimin", "00:01", 1, 1, 1)
158
159
        self.bak_env_home_group = os.environ.get('HOME_GROUP')
160
        if self.bak_env_home_group is not None:
161
            del os.environ['HOME_GROUP']
162
163
    def tearDown(self):
164
        if self.bak_env_home_group is not None:
165
            os.environ['HOME_GROUP'] = self.bak_env_home_group
166
167
    def test_generate_pbs_no_home(self):
168
        assert_raises(ValueError, GuilliminJobGenerator, self.queue, self.commands)
169
170
    def test_generate_pbs(self):
171
        os.environ['HOME_GROUP'] = "/path/to/group"
172
        job_generator = GuilliminJobGenerator(self.queue, self.commands)
173
        pbs = job_generator.pbs_list[0]
174
        assert_true("-A" in pbs.options)
175
        assert_true(pbs.options["-A"] == 'group')
176
177
178
class TestMammouthQueue(object):
179
180
    def setUp(self):
181
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
182
        self.queue = Queue("qtest@mp2", "mammouth")
183
184
    def test_generate_pbs(self):
185
        job_generator = MammouthJobGenerator(self.queue, self.commands)
186
187
        assert_true("ppn=1" in str(job_generator.pbs_list[0]))
188
189
190
class TestHeliosQueue(object):
191
192
    def setUp(self):
193
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
194
        self.queue = Queue("gpu_8", "helios")
195
196
        self._home_backup = os.environ['HOME']
197
        os.environ['HOME'] = tempfile.mkdtemp()
198
199
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
200
        if os.path.isfile(self.rap_filename):
201
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
202
        else:
203
            self.rapid = 'asd-123-ab'
204
            with open(self.rap_filename, 'w+') as rap_file:
205
                rap_file.write(self.rapid)
206
207
        self.job_generator = HeliosJobGenerator(self.queue, self.commands)
208
209
    def tearDown(self):
210
        shutil.rmtree(os.environ['HOME'])
211
        os.environ['HOME'] = self._home_backup
212
213
    def test_generate_pbs_invalid_group(self):
214
        os.remove(self.rap_filename)
215
216
        assert_raises(ValueError, HeliosJobGenerator, self.queue, self.commands)
217
218
    def test_generate_pbs_valid_group(self):
219
        pbs = self.job_generator.pbs_list[0]
220
221
        assert_equal(pbs.options['-A'], self.rapid)
222
223
    def test_generate_pbs_ppn_is_absent(self):
224
        assert_false("ppn=" in str(self.job_generator.pbs_list[0]))
225
226
    def test_generate_pbs_even_nb_commands(self):
227
        assert_true("gpus=4" in str(self.job_generator.pbs_list[0]))
228
229
    def test_generate_pbs_odd_nb_commands(self):
230
        commands = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5"]
231
        job_generator = HeliosJobGenerator(self.queue, commands)
232
233
        assert_true("gpus=5" in str(job_generator.pbs_list[0]))
234
235
236
class TestHadesQueue(object):
237
238
    def setUp(self):
239
        self.queue = Queue("@hades", "hades")
240
241
        self.commands4 = ["echo 1", "echo 2", "echo 3", "echo 4"]
242
        job_generator = HadesJobGenerator(self.queue, self.commands4)
243
        self.pbs4 = job_generator.pbs_list
244
245
        # 8 commands chosen because there is 8 cores but still should be split because there is 6 gpu
246
        self.commands8 = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5", "echo 6", "echo 7", "echo 8"]
247
        job_generator = HadesJobGenerator(self.queue, self.commands8)
248
        self.pbs8 = job_generator.pbs_list
249
250
    def test_generate_pbs_ppn(self):
251
        assert_true("ppn={}".format(len(self.commands4)) in str(self.pbs4[0]))
252
253
    def test_generate_pbs_no_gpus_used(self):
254
        # Hades use ppn instead og the gpus flag and breaks if gpus is there
255
        assert_false("gpus=" in str(self.pbs4[0]))
256
257
    def test_pbs_split_1_job(self):
258
        assert_equal(len(self.pbs4), 1)
259
260
    def test_pbs_split_2_job(self):
261
        assert_equal(len(self.pbs8), 2)
262
263
    def test_pbs_split_2_job_nb_commands(self):
264
        assert_true("ppn=6" in str(self.pbs8[0]))
265
        assert_true("ppn=2" in str(self.pbs8[1]))
266
267
268
class TestJobGeneratorFactory(object):
269
270
    def setUp(self):
271
        self._home_backup = os.environ['HOME']
272
        os.environ['HOME'] = tempfile.mkdtemp()
273
274
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
275
        if os.path.isfile(self.rap_filename):
276
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
277
        else:
278
            self.rapid = 'asd-123-ab'
279
            with open(self.rap_filename, 'w+') as rap_file:
280
                rap_file.write(self.rapid)
281
282
    def tearDown(self):
283
        shutil.rmtree(os.environ['HOME'])
284
        os.environ['HOME'] = self._home_backup
285
286
    def _test_job_generator_factory(self, cluster_name, job_generator_class):
287
        q = Queue("test", cluster_name, 1, 1, 1, 1)
288
        job_generator = job_generator_factory(q, [], cluster_name=cluster_name)
289
        assert_true(isinstance(job_generator, job_generator_class))
290
        assert_true(type(job_generator) is job_generator_class)
291
292
    def test_job_generator_factory(self):
293
        clusters = [("guillimin", GuilliminJobGenerator),
294
                    ("mammouth", MammouthJobGenerator),
295
                    ("helios", HeliosJobGenerator),
296
                    ("hades", HadesJobGenerator),
297
                    (None, JobGenerator)]
298
299
        for cluster_name, job_generator_class in clusters:
300
            yield self._test_job_generator_factory, cluster_name, job_generator_class
301