Completed
Push — master ( 8c8fb9...981c14 )
by Marc-Alexandre
7s
created

TestGuilliminQueue   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 23
Duplicated Lines 0 %
Metric Value
dl 0
loc 23
rs 10
wmc 6

4 Methods

Rating   Name   Duplication   Size   Complexity  
A test_generate_pbs() 0 6 1
A setUp() 0 7 2
A test_generate_pbs_no_home() 0 2 1
A tearDown() 0 3 2
1
from nose.tools import assert_true, assert_false, assert_equal, assert_raises
2
3
import os
4
import tempfile
5
import shutil
6
from smartdispatch.queue import Queue
7
from smartdispatch.job_generator import JobGenerator, job_generator_factory
8
from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator
9
from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator
10
11
12
class TestJobGenerator(object):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.cluster_name = "skynet"
17
        self.name = "9000@hal"
18
        self.walltime = "10:00"
19
        self.cores = 42
20
        self.gpus = 42
21
        self.mem_per_node = 32
22
        self.modules = ["cuda", "python"]
23
24
        self.queue = Queue(self.name, self.cluster_name, self.walltime, self.cores, 0, self.mem_per_node, self.modules)
25
        self.queue_gpu = Queue(self.name, self.cluster_name, self.walltime, self.cores, self.gpus, self.mem_per_node, self.modules)
26
27
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
28
29
    def tearDown(self):
30
        shutil.rmtree(self.testing_dir)
31
32
    def test_generate_pbs(self):
33
        job_generator = JobGenerator(self.queue, self.commands)
34
35
        # Test nb_cores_per_command argument
36
        # Should needs one PBS file
37
        assert_equal(len(job_generator.pbs_list), 1)
38
        assert_equal(job_generator.pbs_list[0].commands, self.commands)
39
40 View Code Duplication
    def test_generate_pbs2_cpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
41
        # Should needs two PBS file
42
        command_params = {'nb_cores_per_command': self.cores // 2}
43
        job_generator = JobGenerator(self.queue, self.commands, command_params)
44
        assert_equal(len(job_generator.pbs_list), 2)
45
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
46
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
47
48
    def test_generate_pbs4_cpu(self):
49
        # Should needs four PBS file
50
        command_params = {'nb_cores_per_command': self.cores}
51
        job_generator = JobGenerator(self.queue, self.commands, command_params)
52
        assert_equal(len(job_generator.pbs_list), 4)
53
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
54
55
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
56
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
57
58
        # Test modules to load
59
        # Check if needed modules for this queue are included in the PBS file
60
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
61
62 View Code Duplication
    def test_generate_pbs2_gpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
63
        # Test nb_gpus_per_command argument
64
        # Should needs two PBS file
65
        command_params = {'nb_gpus_per_command': self.gpus // 2}
66
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params)
67
        assert_equal(len(job_generator.pbs_list), 2)
68
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
69
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
70
71
    def test_generate_pbs4_gpu(self):
72
        # Should needs four PBS files
73
        command_params = {'nb_gpus_per_command': self.gpus}
74
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params)
75
        assert_equal(len(job_generator.pbs_list), 4)
76
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
77
78
        # Since queue has gpus it should be specified in PBS resource `nodes`
79
        assert_true('gpus' in job_generator.pbs_list[0].resources['nodes'])
80
81
        # Test modules to load
82
        # Check if needed modules for this queue are included in the PBS file
83
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
84
85
    def test_write_pbs_files(self):
86
        commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
87
        command_params = {'nb_cores_per_command': self.cores}
88
        job_generator = JobGenerator(self.queue, commands, command_params)
89
        filenames = job_generator.write_pbs_files(self.testing_dir)
90
        assert_equal(len(filenames), 4)
91
92
93
class TestGuilliminQueue(object):
94
95
    def setUp(self):
96
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
97
        self.queue = Queue("test", "guillimin", "00:01", 1, 1, 1)
98
99
        self.bak_env_home_group = os.environ.get('HOME_GROUP')
100
        if self.bak_env_home_group is not None:
101
            del os.environ['HOME_GROUP']
102
103
    def tearDown(self):
104
        if self.bak_env_home_group is not None:
105
            os.environ['HOME_GROUP'] = self.bak_env_home_group
106
107
    def test_generate_pbs_no_home(self):
108
        assert_raises(ValueError, GuilliminJobGenerator, self.queue, self.commands)
109
110
    def test_generate_pbs(self):
111
        os.environ['HOME_GROUP'] = "/path/to/group"
112
        job_generator = GuilliminJobGenerator(self.queue, self.commands)
113
        pbs = job_generator.pbs_list[0]
114
        assert_true("-A" in pbs.options)
115
        assert_true(pbs.options["-A"] == 'group')
116
117
118
class TestMammouthQueue(object):
119
120
    def setUp(self):
121
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
122
        self.queue = Queue("qtest@mp2", "mammouth")
123
124
    def test_generate_pbs(self):
125
        job_generator = MammouthJobGenerator(self.queue, self.commands)
126
127
        assert_true("ppn=1" in str(job_generator.pbs_list[0]))
128
129
130
class TestHeliosQueue(object):
131
132
    def setUp(self):
133
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
134
        self.queue = Queue("gpu_8", "helios")
135
136
        self._home_backup = os.environ['HOME']
137
        os.environ['HOME'] = tempfile.mkdtemp()
138
139
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
140
        if os.path.isfile(self.rap_filename):
141
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
142
        else:
143
            self.rapid = 'asd-123-ab'
144
            with open(self.rap_filename, 'w+') as rap_file:
145
                rap_file.write(self.rapid)
146
147
        self.job_generator = HeliosJobGenerator(self.queue, self.commands)
148
149
    def tearDown(self):
150
        shutil.rmtree(os.environ['HOME'])
151
        os.environ['HOME'] = self._home_backup
152
153
    def test_generate_pbs_invalid_group(self):
154
        os.remove(self.rap_filename)
155
156
        assert_raises(ValueError, HeliosJobGenerator, self.queue, self.commands)
157
158
    def test_generate_pbs_valid_group(self):
159
        pbs = self.job_generator.pbs_list[0]
160
161
        assert_equal(pbs.options['-A'], self.rapid)
162
163
    def test_generate_pbs_ppn_is_absent(self):
164
        assert_false("ppn=" in str(self.job_generator.pbs_list[0]))
165
166
    def test_generate_pbs_even_nb_commands(self):
167
        assert_true("gpus=4" in str(self.job_generator.pbs_list[0]))
168
169
    def test_generate_pbs_odd_nb_commands(self):
170
        commands = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5"]
171
        job_generator = HeliosJobGenerator(self.queue, commands)
172
173
        assert_true("gpus=5" in str(job_generator.pbs_list[0]))
174
175
176
class TestHadesQueue(object):
177
178
    def setUp(self):
179
        self.queue = Queue("@hades", "hades")
180
181
        self.commands4 = ["echo 1", "echo 2", "echo 3", "echo 4"]
182
        job_generator = HadesJobGenerator(self.queue, self.commands4)
183
        self.pbs4 = job_generator.pbs_list
184
185
        # 8 commands chosen because there is 8 cores but still should be split because there is 6 gpu
186
        self.commands8 = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5", "echo 6", "echo 7", "echo 8"]
187
        job_generator = HadesJobGenerator(self.queue, self.commands8)
188
        self.pbs8 = job_generator.pbs_list
189
190
    def test_generate_pbs_ppn(self):
191
        assert_true("ppn={}".format(len(self.commands4)) in str(self.pbs4[0]))
192
193
    def test_generate_pbs_no_gpus_used(self):
194
        # Hades use ppn instead og the gpus flag and breaks if gpus is there
195
        assert_false("gpus=" in str(self.pbs4[0]))
196
197
    def test_pbs_split_1_job(self):
198
        assert_equal(len(self.pbs4), 1)
199
200
    def test_pbs_split_2_job(self):
201
        assert_equal(len(self.pbs8), 2)
202
203
    def test_pbs_split_2_job_nb_commands(self):
204
        assert_true("ppn=6" in str(self.pbs8[0]))
205
        assert_true("ppn=2" in str(self.pbs8[1]))
206
207
208
class TestJobGeneratorFactory(object):
209
210
    def setUp(self):
211
        self._home_backup = os.environ['HOME']
212
        os.environ['HOME'] = tempfile.mkdtemp()
213
214
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
215
        if os.path.isfile(self.rap_filename):
216
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
217
        else:
218
            self.rapid = 'asd-123-ab'
219
            with open(self.rap_filename, 'w+') as rap_file:
220
                rap_file.write(self.rapid)
221
222
    def tearDown(self):
223
        shutil.rmtree(os.environ['HOME'])
224
        os.environ['HOME'] = self._home_backup
225
226
    def _test_job_generator_factory(self, cluster_name, job_generator_class):
227
        q = Queue("test", cluster_name, 1, 1, 1, 1)
228
        job_generator = job_generator_factory(q, [], cluster_name=cluster_name)
229
        assert_true(isinstance(job_generator, job_generator_class))
230
        assert_true(type(job_generator) is job_generator_class)
231
232
    def test_job_generator_factory(self):
233
        clusters = [("guillimin", GuilliminJobGenerator),
234
                    ("mammouth", MammouthJobGenerator),
235
                    ("helios", HeliosJobGenerator),
236
                    ("hades", HadesJobGenerator),
237
                    (None, JobGenerator)]
238
239
        for cluster_name, job_generator_class in clusters:
240
            yield self._test_job_generator_factory, cluster_name, job_generator_class
241