Completed
Pull Request — master (#167)
by
unknown
41s
created

test_main_launch_with_gpus_command()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 9
rs 9.6666
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
from mock import patch
7
from subprocess import call
8
import subprocess
9
from nose.tools import assert_true, assert_equal
10
from smartdispatch import smartdispatch_script
11
import traceback
12
13
class TestSmartdispatcher(unittest.TestCase):
14
15
    def setUp(self):
16
        self.testing_dir = tempfile.mkdtemp()
17
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
18
19
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
20
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
21
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
22
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
23
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
24
        self.nb_commands = len(self.commands)
25
26
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
27
        self.smart_dispatch_command = '{} -C 1 -G 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
28
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
29
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
30
31
        self.smart_dispatch_launcher_command = '{} -C 1 -G 1 -q test -t 5:00'.format(pjoin(scripts_path, 'smart-dispatch'))
32
        self.launcher_command = "{0} launch {1}".format(self.smart_dispatch_launcher_command, self.folded_commands)
33
34
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -G 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
35
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
36
        self.nb_workers = 10
37
38
        smart_dispatch_command_with_cores = '{} -C 1 -G 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
39
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
40
41
        smart_dispatch_command_with_gpus = '{} -C 1 -G 1 -g {{gpus}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
42
        self.launch_command_with_gpus = smart_dispatch_command_with_gpus.format('launch ' + self.folded_commands, gpus='{gpus}')
43
44
        self._cwd = os.getcwd()
45
        os.chdir(self.testing_dir)
46
47
    def tearDown(self):
48
        os.chdir(self._cwd)
49
        shutil.rmtree(self.testing_dir)
50
51
    def test_main_launch(self):
52
        # Actual test
53
        exit_status = call(self.launch_command, shell=True)
54
55
        # Test validation
56
        assert_equal(exit_status, 0)
57
        assert_true(os.path.isdir(self.logs_dir))
58
        assert_equal(len(os.listdir(self.logs_dir)), 1)
59
60
        batch_uid = os.listdir(self.logs_dir)[0]
61
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
62
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
63
64
    def test_launch_using_commands_file(self):
65
        # Actual test
66
        commands_filename = "commands_to_run.txt"
67
        open(commands_filename, 'w').write("\n".join(self.commands))
68
69
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
70
        exit_status = call(launch_command, shell=True)
71
72
        # Test validation
73
        assert_equal(exit_status, 0)
74
        assert_true(os.path.isdir(self.logs_dir))
75
        assert_equal(len(os.listdir(self.logs_dir)), 1)
76
77
        batch_uid = os.listdir(self.logs_dir)[0]
78
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
79
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
80
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
81
82
    def test_main_launch_with_pool_of_workers(self):
83
        # Actual test
84
        exit_status = call(self.launch_command_with_pool, shell=True)
85
86
        # Test validation
87
        assert_equal(exit_status, 0)
88
        assert_true(os.path.isdir(self.logs_dir))
89
        assert_equal(len(os.listdir(self.logs_dir)), 1)
90
91
        batch_uid = os.listdir(self.logs_dir)[0]
92
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
93
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
94
95
    def test_main_launch_with_cores_command(self):
96
        # Actual test
97
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
98
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
99
100
        # Test validation
101
        assert_equal(exit_status_0, 2)
102
        assert_equal(exit_status_100, 2)        
103
        assert_true(os.path.isdir(self.logs_dir))
104
105
    def test_main_launch_with_gpus_command(self):
106
        # Actual test
107
        exit_status_0 = call(self.launch_command_with_gpus.format(gpus=0), shell=True)
108
        exit_status_100 = call(self.launch_command_with_gpus.format(gpus=100), shell=True)
109
110
        # Test validation
111
        assert_equal(exit_status_0, 0)
112
        assert_equal(exit_status_100, 2)
113
        assert_true(os.path.isdir(self.logs_dir))
114
115 View Code Duplication
    @patch('subprocess.check_output')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
116
    def test_launch_job_check(self, mock_check_output):
117
118
        #For this test, we won't call the script directly, since we want to mock subprocess.check_output
119
        mock_check_output.side_effect = subprocess.CalledProcessError(1, 1, "A wild error appeared!")
120
        argv = ['-t', '0:0:1', '-G', '1', '-C', '1', '-q', 'random', 'launch', 'echo', 'testing123']
121
122
123
        #Test if the test fail.
124
        try:
125
            with self.assertRaises(SystemExit) as context:
126
                smartdispatch_script.main(argv=argv)
127
128
                self.assertTrue(context.exception.code, 2)
129
130
        except subprocess.CalledProcessError:
131
            self.fail("smartdispatch_script.main() raised CalledProcessError unexpectedly:\n {}".format(traceback.format_exc()))
132
133
        # Test if the test pass (i.e the script run normaly)
134
        mock_check_output.side_effect = None
135
        mock_check_output.return_value = ""
136
137
        try:
138
            smartdispatch_script.main(argv=argv)
139
        except SystemExit as e:
140
            self.fail("The launcher had no problem, but the script failed nonetheless.")
141
142
143
    def test_main_resume(self):
144
        # Setup
145
        call(self.launch_command, shell=True)
146
        batch_uid = os.listdir(self.logs_dir)[0]
147
148
        # Simulate that some commands are in the running state.
149
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
150
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
151
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
152
        commands = open(pending_commands_file).read().strip().split("\n")
153
        with open(running_commands_file, 'w') as running_commands:
154
            running_commands.write("\n".join(commands[::2]) + "\n")
155
        with open(pending_commands_file, 'w') as pending_commands:
156
            pending_commands.write("\n".join(commands[1::2]) + "\n")
157
158
        # Actual test (should move running commands back to pending).
159
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
160
161
        # Test validation
162
        assert_equal(exit_status, 0)
163
        assert_true(os.path.isdir(self.logs_dir))
164
        assert_equal(len(os.listdir(self.logs_dir)), 1)
165
        assert_equal(len(open(running_commands_file).readlines()), 0)
166
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
167
168
        # Test when batch_uid is a path instead of a jobname.
169
        # Setup
170
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
171
172
        # Simulate that some commands are in the running state.
173
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
174
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
175
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
176
        commands = open(pending_commands_file).read().strip().split("\n")
177
        with open(running_commands_file, 'w') as running_commands:
178
            running_commands.write("\n".join(commands[::2]) + "\n")
179
        with open(pending_commands_file, 'w') as pending_commands:
180
            pending_commands.write("\n".join(commands[1::2]) + "\n")
181
182
        # Actual test (should move running commands back to pending).
183
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
184
185
        # Test validation
186
        assert_equal(exit_status, 0)
187
        assert_true(os.path.isdir(self.logs_dir))
188
        assert_equal(len(os.listdir(self.logs_dir)), 1)
189
        assert_equal(len(open(running_commands_file).readlines()), 0)
190
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
191
192
    def test_main_resume_by_expanding_pool_default(self):
193
        # Create SMART_DISPATCH_LOGS structure.
194
        call(self.launch_command, shell=True)
195
        batch_uid = os.listdir(self.logs_dir)[0]
196
197
        # Simulate that some commands are in the running state.
198
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
199
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
200
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
201
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
202
        commands = open(pending_commands_file).read().strip().split("\n")
203
        with open(running_commands_file, 'w') as running_commands:
204
            running_commands.write("\n".join(commands[::2]) + "\n")
205
        with open(pending_commands_file, 'w') as pending_commands:
206
            pending_commands.write("\n".join(commands[1::2]) + "\n")
207
208
        # Remove PBS files so we can check that new ones are going to be created.
209
        for f in os.listdir(path_job_commands):
210
            if f.startswith('job_commands_') and f.endswith('.sh'):
211
                os.remove(pjoin(path_job_commands, f))
212
213
        # Should NOT move running commands back to pending but should add new workers.
214
        command_line = self.resume_command.format(batch_uid)
215
        command_line += " --expandPool"
216
        exit_status = call(command_line, shell=True)
217
218
        # Test validation
219
        assert_equal(exit_status, 0)
220
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
221
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
222
223
        nb_job_commands_files = len(os.listdir(path_job_commands))
224
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
225
226
    def test_main_resume_by_expanding_pool(self):
227
        # Create SMART_DISPATCH_LOGS structure.
228
        call(self.launch_command, shell=True)
229
        batch_uid = os.listdir(self.logs_dir)[0]
230
231
        # Simulate that some commands are in the running state.
232
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
233
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
234
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
235
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
236
        commands = open(pending_commands_file).read().strip().split("\n")
237
        with open(running_commands_file, 'w') as running_commands:
238
            running_commands.write("\n".join(commands[::2]) + "\n")
239
        with open(pending_commands_file, 'w') as pending_commands:
240
            pending_commands.write("\n".join(commands[1::2]) + "\n")
241
242
        # Remove PBS files so we can check that new ones are going to be created.
243
        for f in os.listdir(path_job_commands):
244
            if f.startswith('job_commands_') and f.endswith('.sh'):
245
                os.remove(pjoin(path_job_commands, f))
246
247
        # Should NOT move running commands back to pending but should add new workers.
248
        nb_workers_to_add = 3
249
        command_line = self.resume_command.format(batch_uid)
250
        command_line += " --expandPool {}".format(nb_workers_to_add)
251
        exit_status = call(command_line, shell=True)
252
253
        # Test validation
254
        assert_equal(exit_status, 0)
255
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
256
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
257
258
        nb_job_commands_files = len(os.listdir(path_job_commands))
259
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
260