Completed
Push — master ( 981c14...d395f4 )
by Marc-Alexandre
01:05
created

TestJobGenerator   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 113
Duplicated Lines 13.27 %
Metric Value
dl 15
loc 113
rs 10
wmc 21

12 Methods

Rating   Name   Duplication   Size   Complexity  
A test_generate_pbs4_cpu() 0 13 2
A test_add_pbs_flags_invalid() 0 2 1
A test_generate_pbs2_cpu() 7 7 1
A test_write_pbs_files() 0 6 1
A test_generate_pbs4_gpu() 0 13 2
A test_generate_pbs() 0 7 1
A test_generate_pbs2_gpu() 8 8 1
A tearDown() 0 2 1
A setUp() 0 14 1
A test_add_pbs_flags() 0 5 2
B _test_add_pbs_flags() 0 20 7
A test_add_pbs_flags_invalid_resource() 0 2 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from nose.tools import assert_true, assert_false, assert_equal, assert_raises
2
3
import os
4
import tempfile
5
import shutil
6
from smartdispatch.queue import Queue
7
from smartdispatch.job_generator import JobGenerator, job_generator_factory
8
from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator
9
from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator
10
11
12
class TestJobGenerator(object):
13
    pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30']
14
15
    def setUp(self):
16
        self.testing_dir = tempfile.mkdtemp()
17
        self.cluster_name = "skynet"
18
        self.name = "9000@hal"
19
        self.walltime = "10:00"
20
        self.cores = 42
21
        self.gpus = 42
22
        self.mem_per_node = 32
23
        self.modules = ["cuda", "python"]
24
25
        self.queue = Queue(self.name, self.cluster_name, self.walltime, self.cores, 0, self.mem_per_node, self.modules)
26
        self.queue_gpu = Queue(self.name, self.cluster_name, self.walltime, self.cores, self.gpus, self.mem_per_node, self.modules)
27
28
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
29
30
    def tearDown(self):
31
        shutil.rmtree(self.testing_dir)
32
33
    def test_generate_pbs(self):
34
        job_generator = JobGenerator(self.queue, self.commands)
35
36
        # Test nb_cores_per_command argument
37
        # Should needs one PBS file
38
        assert_equal(len(job_generator.pbs_list), 1)
39
        assert_equal(job_generator.pbs_list[0].commands, self.commands)
40
41 View Code Duplication
    def test_generate_pbs2_cpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
42
        # Should needs two PBS file
43
        command_params = {'nb_cores_per_command': self.cores // 2}
44
        job_generator = JobGenerator(self.queue, self.commands, command_params)
45
        assert_equal(len(job_generator.pbs_list), 2)
46
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
47
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
48
49
    def test_generate_pbs4_cpu(self):
50
        # Should needs four PBS file
51
        command_params = {'nb_cores_per_command': self.cores}
52
        job_generator = JobGenerator(self.queue, self.commands, command_params)
53
        assert_equal(len(job_generator.pbs_list), 4)
54
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
55
56
        # Since queue has no gpus it should not be specified in PBS resource `nodes`
57
        assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])
58
59
        # Test modules to load
60
        # Check if needed modules for this queue are included in the PBS file
61
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
62
63 View Code Duplication
    def test_generate_pbs2_gpu(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
64
        # Test nb_gpus_per_command argument
65
        # Should needs two PBS file
66
        command_params = {'nb_gpus_per_command': self.gpus // 2}
67
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params)
68
        assert_equal(len(job_generator.pbs_list), 2)
69
        assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
70
        assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])
71
72
    def test_generate_pbs4_gpu(self):
73
        # Should needs four PBS files
74
        command_params = {'nb_gpus_per_command': self.gpus}
75
        job_generator = JobGenerator(self.queue_gpu, self.commands, command_params)
76
        assert_equal(len(job_generator.pbs_list), 4)
77
        assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)
78
79
        # Since queue has gpus it should be specified in PBS resource `nodes`
80
        assert_true('gpus' in job_generator.pbs_list[0].resources['nodes'])
81
82
        # Test modules to load
83
        # Check if needed modules for this queue are included in the PBS file
84
        assert_equal(job_generator.pbs_list[0].modules, self.modules)
85
86
    def test_write_pbs_files(self):
87
        commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
88
        command_params = {'nb_cores_per_command': self.cores}
89
        job_generator = JobGenerator(self.queue, commands, command_params)
90
        filenames = job_generator.write_pbs_files(self.testing_dir)
91
        assert_equal(len(filenames), 4)
92
93
    def _test_add_pbs_flags(self, flags):
94
        job_generator = JobGenerator(self.queue, self.commands)
95
        job_generator.add_pbs_flags(flags)
96
97
        resources = []
98
        options = []
99
100
        for flag in flags:
101
            if flag.startswith('-l'):
102
                resources += [flag[:2] + ' ' + flag[2:]]
103
            elif flag.startswith('-'):
104
                options += [(flag[:2] + ' ' + flag[2:]).strip()]
105
106
        for pbs in job_generator.pbs_list:
107
            pbs_str = pbs.__str__()
108
            for flag in resources:
109
                assert_equal(pbs_str.count(flag), 1)
110
                assert_equal(pbs_str.count(flag[:flag.find('=')]), 1)
111
            for flag in options:
112
                assert_equal(pbs_str.count(flag), 1)
113
114
    def test_add_pbs_flags(self):
115
        for flag in self.pbs_flags:
116
            yield self._test_add_pbs_flags, [flag]
117
118
        yield self._test_add_pbs_flags, self.pbs_flags
119
120
    def test_add_pbs_flags_invalid(self):
121
        assert_raises(ValueError, self._test_add_pbs_flags, 'weeee')
122
123
    def test_add_pbs_flags_invalid_resource(self):
124
        assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee')
125
126
127
class TestGuilliminQueue(object):
128
129
    def setUp(self):
130
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
131
        self.queue = Queue("test", "guillimin", "00:01", 1, 1, 1)
132
133
        self.bak_env_home_group = os.environ.get('HOME_GROUP')
134
        if self.bak_env_home_group is not None:
135
            del os.environ['HOME_GROUP']
136
137
    def tearDown(self):
138
        if self.bak_env_home_group is not None:
139
            os.environ['HOME_GROUP'] = self.bak_env_home_group
140
141
    def test_generate_pbs_no_home(self):
142
        assert_raises(ValueError, GuilliminJobGenerator, self.queue, self.commands)
143
144
    def test_generate_pbs(self):
145
        os.environ['HOME_GROUP'] = "/path/to/group"
146
        job_generator = GuilliminJobGenerator(self.queue, self.commands)
147
        pbs = job_generator.pbs_list[0]
148
        assert_true("-A" in pbs.options)
149
        assert_true(pbs.options["-A"] == 'group')
150
151
152
class TestMammouthQueue(object):
153
154
    def setUp(self):
155
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
156
        self.queue = Queue("qtest@mp2", "mammouth")
157
158
    def test_generate_pbs(self):
159
        job_generator = MammouthJobGenerator(self.queue, self.commands)
160
161
        assert_true("ppn=1" in str(job_generator.pbs_list[0]))
162
163
164
class TestHeliosQueue(object):
165
166
    def setUp(self):
167
        self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"]
168
        self.queue = Queue("gpu_8", "helios")
169
170
        self._home_backup = os.environ['HOME']
171
        os.environ['HOME'] = tempfile.mkdtemp()
172
173
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
174
        if os.path.isfile(self.rap_filename):
175
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
176
        else:
177
            self.rapid = 'asd-123-ab'
178
            with open(self.rap_filename, 'w+') as rap_file:
179
                rap_file.write(self.rapid)
180
181
        self.job_generator = HeliosJobGenerator(self.queue, self.commands)
182
183
    def tearDown(self):
184
        shutil.rmtree(os.environ['HOME'])
185
        os.environ['HOME'] = self._home_backup
186
187
    def test_generate_pbs_invalid_group(self):
188
        os.remove(self.rap_filename)
189
190
        assert_raises(ValueError, HeliosJobGenerator, self.queue, self.commands)
191
192
    def test_generate_pbs_valid_group(self):
193
        pbs = self.job_generator.pbs_list[0]
194
195
        assert_equal(pbs.options['-A'], self.rapid)
196
197
    def test_generate_pbs_ppn_is_absent(self):
198
        assert_false("ppn=" in str(self.job_generator.pbs_list[0]))
199
200
    def test_generate_pbs_even_nb_commands(self):
201
        assert_true("gpus=4" in str(self.job_generator.pbs_list[0]))
202
203
    def test_generate_pbs_odd_nb_commands(self):
204
        commands = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5"]
205
        job_generator = HeliosJobGenerator(self.queue, commands)
206
207
        assert_true("gpus=5" in str(job_generator.pbs_list[0]))
208
209
210
class TestHadesQueue(object):
211
212
    def setUp(self):
213
        self.queue = Queue("@hades", "hades")
214
215
        self.commands4 = ["echo 1", "echo 2", "echo 3", "echo 4"]
216
        job_generator = HadesJobGenerator(self.queue, self.commands4)
217
        self.pbs4 = job_generator.pbs_list
218
219
        # 8 commands chosen because there is 8 cores but still should be split because there is 6 gpu
220
        self.commands8 = ["echo 1", "echo 2", "echo 3", "echo 4", "echo 5", "echo 6", "echo 7", "echo 8"]
221
        job_generator = HadesJobGenerator(self.queue, self.commands8)
222
        self.pbs8 = job_generator.pbs_list
223
224
    def test_generate_pbs_ppn(self):
225
        assert_true("ppn={}".format(len(self.commands4)) in str(self.pbs4[0]))
226
227
    def test_generate_pbs_no_gpus_used(self):
228
        # Hades use ppn instead og the gpus flag and breaks if gpus is there
229
        assert_false("gpus=" in str(self.pbs4[0]))
230
231
    def test_pbs_split_1_job(self):
232
        assert_equal(len(self.pbs4), 1)
233
234
    def test_pbs_split_2_job(self):
235
        assert_equal(len(self.pbs8), 2)
236
237
    def test_pbs_split_2_job_nb_commands(self):
238
        assert_true("ppn=6" in str(self.pbs8[0]))
239
        assert_true("ppn=2" in str(self.pbs8[1]))
240
241
242
class TestJobGeneratorFactory(object):
243
244
    def setUp(self):
245
        self._home_backup = os.environ['HOME']
246
        os.environ['HOME'] = tempfile.mkdtemp()
247
248
        self.rap_filename = os.path.join(os.environ['HOME'], ".default_rap")
249
        if os.path.isfile(self.rap_filename):
250
            raise Exception("Test fail: {} should not be there.".format(self.rap_filename))
251
        else:
252
            self.rapid = 'asd-123-ab'
253
            with open(self.rap_filename, 'w+') as rap_file:
254
                rap_file.write(self.rapid)
255
256
    def tearDown(self):
257
        shutil.rmtree(os.environ['HOME'])
258
        os.environ['HOME'] = self._home_backup
259
260
    def _test_job_generator_factory(self, cluster_name, job_generator_class):
261
        q = Queue("test", cluster_name, 1, 1, 1, 1)
262
        job_generator = job_generator_factory(q, [], cluster_name=cluster_name)
263
        assert_true(isinstance(job_generator, job_generator_class))
264
        assert_true(type(job_generator) is job_generator_class)
265
266
    def test_job_generator_factory(self):
267
        clusters = [("guillimin", GuilliminJobGenerator),
268
                    ("mammouth", MammouthJobGenerator),
269
                    ("helios", HeliosJobGenerator),
270
                    ("hades", HadesJobGenerator),
271
                    (None, JobGenerator)]
272
273
        for cluster_name, job_generator_class in clusters:
274
            yield self._test_job_generator_factory, cluster_name, job_generator_class
275