Completed
Pull Request — master (#167)
by
unknown
35s
created

test_main_launch_with_gpus_command()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 9
rs 9.6666
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
from mock import patch
7
from subprocess import call
8
import subprocess
9
from nose.tools import assert_true, assert_equal
10
from smartdispatch import smartdispatch_script
11
import six
12
from smartdispatch import utils
13
14
class TestSmartdispatcher(unittest.TestCase):
15
16
    def setUp(self):
17
        self.testing_dir = tempfile.mkdtemp()
18
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
19
20
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
21
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
22
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
23
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
24
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
25
        self.nb_commands = len(self.commands)
26
27
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
28
        self.smart_dispatch_command = '{} -C 1 -G 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
29
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
30
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
31
32
        self.smart_dispatch_launcher_command = '{} -C 1 -G 1 -q test -t 5:00'.format(pjoin(scripts_path, 'smart-dispatch'))
33
        self.launcher_command = "{0} launch {1}".format(self.smart_dispatch_launcher_command, self.folded_commands)
34
35
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -G 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
36
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
37
        self.nb_workers = 10
38
39
        smart_dispatch_command_with_cores = '{} -C 1 -G 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
40
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
41
42
        smart_dispatch_command_with_gpus = '{} -C 1 -G 1 -g {{gpus}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
43
        self.launch_command_with_gpus = smart_dispatch_command_with_gpus.format('launch ' + self.folded_commands, gpus='{gpus}')
44
45
        self._cwd = os.getcwd()
46
        os.chdir(self.testing_dir)
47
48
    def tearDown(self):
49
        os.chdir(self._cwd)
50
        shutil.rmtree(self.testing_dir)
51
52
    def test_main_launch(self):
53
        # Actual test
54
        exit_status = call(self.launch_command, shell=True)
55
56
        # Test validation
57
        assert_equal(exit_status, 0)
58
        assert_true(os.path.isdir(self.logs_dir))
59
        assert_equal(len(os.listdir(self.logs_dir)), 1)
60
61
        batch_uid = os.listdir(self.logs_dir)[0]
62
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
63
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
64
65
    def test_launch_using_commands_file(self):
66
        # Actual test
67
        commands_filename = "commands_to_run.txt"
68
        open(commands_filename, 'w').write("\n".join(self.commands))
69
70
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
71
        exit_status = call(launch_command, shell=True)
72
73
        # Test validation
74
        assert_equal(exit_status, 0)
75
        assert_true(os.path.isdir(self.logs_dir))
76
        assert_equal(len(os.listdir(self.logs_dir)), 1)
77
78
        batch_uid = os.listdir(self.logs_dir)[0]
79
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
80
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
81
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
82
83
    def test_main_launch_with_pool_of_workers(self):
84
        # Actual test
85
        exit_status = call(self.launch_command_with_pool, shell=True)
86
87
        # Test validation
88
        assert_equal(exit_status, 0)
89
        assert_true(os.path.isdir(self.logs_dir))
90
        assert_equal(len(os.listdir(self.logs_dir)), 1)
91
92
        batch_uid = os.listdir(self.logs_dir)[0]
93
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
94
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
95
96
    def test_main_launch_with_cores_command(self):
97
        # Actual test
98
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
99
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
100
101
        # Test validation
102
        assert_equal(exit_status_0, 2)
103
        assert_equal(exit_status_100, 2)        
104
        assert_true(os.path.isdir(self.logs_dir))
105
106
    def test_main_launch_with_gpus_command(self):
107
        # Actual test
108
        exit_status_0 = call(self.launch_command_with_gpus.format(gpus=0), shell=True)
109
        exit_status_100 = call(self.launch_command_with_gpus.format(gpus=100), shell=True)
110
111
        # Test validation
112
        assert_equal(exit_status_0, 0)
113
        assert_equal(exit_status_100, 2)
114
        assert_true(os.path.isdir(self.logs_dir))
115
116
    @utils.rethrow_exception(SystemExit, "smartdispatch_script.main() raised SystemExit unexpectedly.")
117
    def test_gpu_check(self):
118
119
        argv = ['-x', '-g', '2', '-G', '1', '-C', '1', '-q', 'random', '-t', '00:00:10' ,'launch', 'echo', 'testing123']
120
121
        # Test if the check fail
122
        with self.assertRaises(SystemExit) as context:
123
            smartdispatch_script.main(argv=argv)
124
125
        self.assertTrue(context.exception.code, 2)
126
127
        # Test if the test pass
128
        argv[2] = '1'
129
        smartdispatch_script.main(argv=argv)
130
131
        # Test if we don't have gpus. (and spicified in script).
132
        argv[2] = '0'
133
        argv[4] = '0'
134
        smartdispatch_script.main(argv=argv)
135
136
        # Don't have gpus, but the user specofy 1 anyway.
137
        argv[2] = '1'
138
        with self.assertRaises(SystemExit) as context:
139
            smartdispatch_script.main(argv=argv)
140
        self.assertTrue(context.exception.code, 2)
141
142
        # Test if the user didn't specified anything.
143
        argv = ['-x', '-C', '1', '-q', 'random', '-t', '00:00:10' ,'launch', 'echo', 'testing123']
144
        smartdispatch_script.main(argv=argv)
145
146
    @utils.rethrow_exception(SystemExit, "smartdispatch_script.main() raised SystemExit unexpectedly.")
147
    def test_cpu_check(self):
148
149
        argv = ['-x', '-c', '2', '-C', '1', '-G', '1', '-t', '00:00:10', '-q', 'random', 'launch', 'echo', 'testing123']
150
151
        # Test if the check fail
152
        with self.assertRaises(SystemExit) as context:
153
            smartdispatch_script.main(argv=argv)
154
155
        self.assertTrue(context.exception.code, 2)
156
157
        # Test if the test pass
158
        argv[2] = '1'
159
        smartdispatch_script.main(argv=argv)
160
161
    @utils.rethrow_exception(subprocess.CalledProcessError, "smartdispatch_script.main() raised subprocess.CalledProcessError unexpectedly")
162
    @patch('subprocess.check_output')
163
    def test_launch_job_check(self, mock_check_output):
164
165
        #For this test, we won't call the script directly, since we want to mock subprocess.check_output
166
        argv = ['-t', '0:0:1', '-G', '1', '-C', '1', '-q', 'random', 'launch', 'echo', 'testing123']
167
168
        # Test if the test pass (i.e the script run normaly)
169
        mock_check_output.side_effect = None
170
        mock_check_output.return_value = ""
171
172
        try:
173
            smartdispatch_script.main(argv=argv)
174
        except SystemExit as e:
175
            self.fail("The launcher had no problem, but the script failed nonetheless.")
176
177
        # Test if the check fail
178
        mock_check_output.side_effect = subprocess.CalledProcessError(1, 1, "A wild error appeared!")
179
        
180
        with self.assertRaises(SystemExit) as context:
181
            smartdispatch_script.main(argv=argv)
182
            self.assertTrue(context.exception.code, 2)
183
184
    def test_main_resume(self):
185
        # Setup
186
        call(self.launch_command, shell=True)
187
        batch_uid = os.listdir(self.logs_dir)[0]
188
189
        # Simulate that some commands are in the running state.
190
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
191
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
192
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
193
        commands = open(pending_commands_file).read().strip().split("\n")
194
        with open(running_commands_file, 'w') as running_commands:
195
            running_commands.write("\n".join(commands[::2]) + "\n")
196
        with open(pending_commands_file, 'w') as pending_commands:
197 View Code Duplication
            pending_commands.write("\n".join(commands[1::2]) + "\n")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
199
        # Actual test (should move running commands back to pending).
200
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
201
202
        # Test validation
203
        assert_equal(exit_status, 0)
204
        assert_true(os.path.isdir(self.logs_dir))
205
        assert_equal(len(os.listdir(self.logs_dir)), 1)
206
        assert_equal(len(open(running_commands_file).readlines()), 0)
207
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
208
209
        # Test when batch_uid is a path instead of a jobname.
210
        # Setup
211
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
212
213
        # Simulate that some commands are in the running state.
214
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
215
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
216
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
217
        commands = open(pending_commands_file).read().strip().split("\n")
218
        with open(running_commands_file, 'w') as running_commands:
219
            running_commands.write("\n".join(commands[::2]) + "\n")
220
        with open(pending_commands_file, 'w') as pending_commands:
221
            pending_commands.write("\n".join(commands[1::2]) + "\n")
222
223
        # Actual test (should move running commands back to pending).
224
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
225
226
        # Test validation
227
        assert_equal(exit_status, 0)
228
        assert_true(os.path.isdir(self.logs_dir))
229
        assert_equal(len(os.listdir(self.logs_dir)), 1)
230
        assert_equal(len(open(running_commands_file).readlines()), 0)
231 View Code Duplication
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232
233
    def test_main_resume_by_expanding_pool_default(self):
234
        # Create SMART_DISPATCH_LOGS structure.
235
        call(self.launch_command, shell=True)
236
        batch_uid = os.listdir(self.logs_dir)[0]
237
238
        # Simulate that some commands are in the running state.
239
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
240
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
241
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
242
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
243
        commands = open(pending_commands_file).read().strip().split("\n")
244
        with open(running_commands_file, 'w') as running_commands:
245
            running_commands.write("\n".join(commands[::2]) + "\n")
246
        with open(pending_commands_file, 'w') as pending_commands:
247
            pending_commands.write("\n".join(commands[1::2]) + "\n")
248
249
        # Remove PBS files so we can check that new ones are going to be created.
250
        for f in os.listdir(path_job_commands):
251
            if f.startswith('job_commands_') and f.endswith('.sh'):
252
                os.remove(pjoin(path_job_commands, f))
253
254
        # Should NOT move running commands back to pending but should add new workers.
255
        command_line = self.resume_command.format(batch_uid)
256
        command_line += " --expandPool"
257
        exit_status = call(command_line, shell=True)
258
259
        # Test validation
260
        assert_equal(exit_status, 0)
261
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
262
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
263
264
        nb_job_commands_files = len(os.listdir(path_job_commands))
265
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
266
267
    def test_main_resume_by_expanding_pool(self):
268
        # Create SMART_DISPATCH_LOGS structure.
269
        call(self.launch_command, shell=True)
270
        batch_uid = os.listdir(self.logs_dir)[0]
271
272
        # Simulate that some commands are in the running state.
273
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
274
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
275
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
276
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
277
        commands = open(pending_commands_file).read().strip().split("\n")
278
        with open(running_commands_file, 'w') as running_commands:
279
            running_commands.write("\n".join(commands[::2]) + "\n")
280
        with open(pending_commands_file, 'w') as pending_commands:
281
            pending_commands.write("\n".join(commands[1::2]) + "\n")
282
283
        # Remove PBS files so we can check that new ones are going to be created.
284
        for f in os.listdir(path_job_commands):
285
            if f.startswith('job_commands_') and f.endswith('.sh'):
286
                os.remove(pjoin(path_job_commands, f))
287
288
        # Should NOT move running commands back to pending but should add new workers.
289
        nb_workers_to_add = 3
290
        command_line = self.resume_command.format(batch_uid)
291
        command_line += " --expandPool {}".format(nb_workers_to_add)
292
        exit_status = call(command_line, shell=True)
293
294
        # Test validation
295
        assert_equal(exit_status, 0)
296
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
297
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
298
299
        nb_job_commands_files = len(os.listdir(path_job_commands))
300
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
301