Completed
Pull Request — master (#167)
by
unknown
28s
created

test_main_launch_with_gpus_command()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 9
rs 9.6666
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
from mock import patch
7
from subprocess import call
8
import subprocess
9
from nose.tools import assert_true, assert_equal
10
from smartdispatch import smartdispatch_script
11
import six
12
import sys
13
import traceback
14
15
def rethrow_exception(exception, new_message):
16
17
    def func_wraper(func):
18
        
19
        def test_func(*args, **kwargs):
20
	    try:
21
                return func(*args, **kwargs)
22
            except exception as e:
23
    
24
                orig_exc_type, orig_exc_value, orig_exc_traceback = sys.exc_info()
25
                new_exc = Exception(new_message)
26
                new_exc.reraised = True
27
                new_exc.__cause__ = orig_exc_value
28
29
                new_traceback = orig_exc_traceback
30
                six.reraise(type(new_exc), new_exc, new_traceback)
31
32
33
        return test_func
34
    return func_wraper
35
36
class TestSmartdispatcher(unittest.TestCase):
37
38
    def setUp(self):
39
        self.testing_dir = tempfile.mkdtemp()
40
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
41
42
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
43
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
44
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
45
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
46
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
47
        self.nb_commands = len(self.commands)
48
49
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
50
        self.smart_dispatch_command = '{} -C 1 -G 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
51
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
52
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
53
54
        self.smart_dispatch_launcher_command = '{} -C 1 -G 1 -q test -t 5:00'.format(pjoin(scripts_path, 'smart-dispatch'))
55
        self.launcher_command = "{0} launch {1}".format(self.smart_dispatch_launcher_command, self.folded_commands)
56
57
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -G 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
58
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
59
        self.nb_workers = 10
60
61
        smart_dispatch_command_with_cores = '{} -C 1 -G 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
62
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
63
64
        smart_dispatch_command_with_gpus = '{} -C 1 -G 1 -g {{gpus}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
65
        self.launch_command_with_gpus = smart_dispatch_command_with_gpus.format('launch ' + self.folded_commands, gpus='{gpus}')
66
67
        self._cwd = os.getcwd()
68
        os.chdir(self.testing_dir)
69
70
    def tearDown(self):
71
        os.chdir(self._cwd)
72
        shutil.rmtree(self.testing_dir)
73
74
    def test_main_launch(self):
75
        # Actual test
76
        exit_status = call(self.launch_command, shell=True)
77
78
        # Test validation
79
        assert_equal(exit_status, 0)
80
        assert_true(os.path.isdir(self.logs_dir))
81
        assert_equal(len(os.listdir(self.logs_dir)), 1)
82
83
        batch_uid = os.listdir(self.logs_dir)[0]
84
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
85
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
86
87
    def test_launch_using_commands_file(self):
88
        # Actual test
89
        commands_filename = "commands_to_run.txt"
90
        open(commands_filename, 'w').write("\n".join(self.commands))
91
92
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
93
        exit_status = call(launch_command, shell=True)
94
95
        # Test validation
96
        assert_equal(exit_status, 0)
97
        assert_true(os.path.isdir(self.logs_dir))
98
        assert_equal(len(os.listdir(self.logs_dir)), 1)
99
100
        batch_uid = os.listdir(self.logs_dir)[0]
101
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
102
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
103
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
104
105
    def test_main_launch_with_pool_of_workers(self):
106
        # Actual test
107
        exit_status = call(self.launch_command_with_pool, shell=True)
108
109
        # Test validation
110
        assert_equal(exit_status, 0)
111
        assert_true(os.path.isdir(self.logs_dir))
112
        assert_equal(len(os.listdir(self.logs_dir)), 1)
113
114
        batch_uid = os.listdir(self.logs_dir)[0]
115
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
116
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
117
118
    def test_main_launch_with_cores_command(self):
119
        # Actual test
120
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
121
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
122
123
        # Test validation
124
        assert_equal(exit_status_0, 2)
125
        assert_equal(exit_status_100, 2)        
126
        assert_true(os.path.isdir(self.logs_dir))
127
128
    def test_main_launch_with_gpus_command(self):
129
        # Actual test
130
        exit_status_0 = call(self.launch_command_with_gpus.format(gpus=0), shell=True)
131
        exit_status_100 = call(self.launch_command_with_gpus.format(gpus=100), shell=True)
132
133
        # Test validation
134
        assert_equal(exit_status_0, 0)
135
        assert_equal(exit_status_100, 2)
136
        assert_true(os.path.isdir(self.logs_dir))
137
138
    @rethrow_exception(SystemExit, "smartdispatch_script.main() raised SystemExit unexpectedly.")
139
    def test_gpu_check(self):
140
141
        argv = ['-x', '-g', '2', '-G', '1', '-C', '1', '-q', 'random', '-t', '00:00:10' ,'launch', 'echo', 'testing123']
142
143
        # Test if the check fail
144
        with self.assertRaises(SystemExit) as context:
145
            smartdispatch_script.main(argv=argv)
146
147
        self.assertTrue(context.exception.code, 2)
148
149
        # Test if the test pass
150
        argv[2] = '1'
151
        smartdispatch_script.main(argv=argv)
152
153
    @rethrow_exception(SystemExit, "smartdispatch_script.main() raised SystemExit unexpectedly.")
154
    def test_cpu_check(self):
155
156
        argv = ['-x', '-c', '2', '-C', '1', '-G', '1', '-t', '00:00:10', '-q', 'random', 'launch', 'echo', 'testing123']
157
158
        # Test if the check fail
159
        with self.assertRaises(SystemExit) as context:
160
            smartdispatch_script.main(argv=argv)
161
162
        self.assertTrue(context.exception.code, 2)
163
164
        # Test if the test pass
165
        argv[2] = '1'
166
        smartdispatch_script.main(argv=argv)
167
168
    @rethrow_exception(subprocess.CalledProcessError, "smartdispatch_script.main() raised subprocess.CalledProcessError unexpectedly")
169
    @patch('subprocess.check_output')
170
    def test_launch_job_check(self, mock_check_output):
171
172
        #For this test, we won't call the script directly, since we want to mock subprocess.check_output
173
        argv = ['-t', '0:0:1', '-G', '1', '-C', '1', '-q', 'random', 'launch', 'echo', 'testing123']
174
175
        # Test if the test pass (i.e the script run normaly)
176
        mock_check_output.side_effect = None
177
        mock_check_output.return_value = ""
178
179
        try:
180
            smartdispatch_script.main(argv=argv)
181
        except SystemExit as e:
182
            self.fail("The launcher had no problem, but the script failed nonetheless.")
183
184
        # Test if the check fail
185
        mock_check_output.side_effect = subprocess.CalledProcessError(1, 1, "A wild error appeared!")
186
        
187
        with self.assertRaises(SystemExit) as context:
188
            smartdispatch_script.main(argv=argv)
189
            self.assertTrue(context.exception.code, 2)
190
191
    def test_main_resume(self):
192
        # Setup
193
        call(self.launch_command, shell=True)
194
        batch_uid = os.listdir(self.logs_dir)[0]
195
196
        # Simulate that some commands are in the running state.
197 View Code Duplication
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
199
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
200
        commands = open(pending_commands_file).read().strip().split("\n")
201
        with open(running_commands_file, 'w') as running_commands:
202
            running_commands.write("\n".join(commands[::2]) + "\n")
203
        with open(pending_commands_file, 'w') as pending_commands:
204
            pending_commands.write("\n".join(commands[1::2]) + "\n")
205
206
        # Actual test (should move running commands back to pending).
207
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
208
209
        # Test validation
210
        assert_equal(exit_status, 0)
211
        assert_true(os.path.isdir(self.logs_dir))
212
        assert_equal(len(os.listdir(self.logs_dir)), 1)
213
        assert_equal(len(open(running_commands_file).readlines()), 0)
214
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
215
216
        # Test when batch_uid is a path instead of a jobname.
217
        # Setup
218
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
219
220
        # Simulate that some commands are in the running state.
221
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
222
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
223
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
224
        commands = open(pending_commands_file).read().strip().split("\n")
225
        with open(running_commands_file, 'w') as running_commands:
226
            running_commands.write("\n".join(commands[::2]) + "\n")
227
        with open(pending_commands_file, 'w') as pending_commands:
228
            pending_commands.write("\n".join(commands[1::2]) + "\n")
229
230
        # Actual test (should move running commands back to pending).
231 View Code Duplication
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232
233
        # Test validation
234
        assert_equal(exit_status, 0)
235
        assert_true(os.path.isdir(self.logs_dir))
236
        assert_equal(len(os.listdir(self.logs_dir)), 1)
237
        assert_equal(len(open(running_commands_file).readlines()), 0)
238
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
239
240
    def test_main_resume_by_expanding_pool_default(self):
241
        # Create SMART_DISPATCH_LOGS structure.
242
        call(self.launch_command, shell=True)
243
        batch_uid = os.listdir(self.logs_dir)[0]
244
245
        # Simulate that some commands are in the running state.
246
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
247
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
248
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
249
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
250
        commands = open(pending_commands_file).read().strip().split("\n")
251
        with open(running_commands_file, 'w') as running_commands:
252
            running_commands.write("\n".join(commands[::2]) + "\n")
253
        with open(pending_commands_file, 'w') as pending_commands:
254
            pending_commands.write("\n".join(commands[1::2]) + "\n")
255
256
        # Remove PBS files so we can check that new ones are going to be created.
257
        for f in os.listdir(path_job_commands):
258
            if f.startswith('job_commands_') and f.endswith('.sh'):
259
                os.remove(pjoin(path_job_commands, f))
260
261
        # Should NOT move running commands back to pending but should add new workers.
262
        command_line = self.resume_command.format(batch_uid)
263
        command_line += " --expandPool"
264
        exit_status = call(command_line, shell=True)
265
266
        # Test validation
267
        assert_equal(exit_status, 0)
268
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
269
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
270
271
        nb_job_commands_files = len(os.listdir(path_job_commands))
272
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
273
274
    def test_main_resume_by_expanding_pool(self):
275
        # Create SMART_DISPATCH_LOGS structure.
276
        call(self.launch_command, shell=True)
277
        batch_uid = os.listdir(self.logs_dir)[0]
278
279
        # Simulate that some commands are in the running state.
280
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
281
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
282
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
283
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
284
        commands = open(pending_commands_file).read().strip().split("\n")
285
        with open(running_commands_file, 'w') as running_commands:
286
            running_commands.write("\n".join(commands[::2]) + "\n")
287
        with open(pending_commands_file, 'w') as pending_commands:
288
            pending_commands.write("\n".join(commands[1::2]) + "\n")
289
290
        # Remove PBS files so we can check that new ones are going to be created.
291
        for f in os.listdir(path_job_commands):
292
            if f.startswith('job_commands_') and f.endswith('.sh'):
293
                os.remove(pjoin(path_job_commands, f))
294
295
        # Should NOT move running commands back to pending but should add new workers.
296
        nb_workers_to_add = 3
297
        command_line = self.resume_command.format(batch_uid)
298
        command_line += " --expandPool {}".format(nb_workers_to_add)
299
        exit_status = call(command_line, shell=True)
300
301
        # Test validation
302
        assert_equal(exit_status, 0)
303
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
304
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
305
306
        nb_job_commands_files = len(os.listdir(path_job_commands))
307
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
308