Completed
Pull Request — master (#167)
by
unknown
33s
created

TestSmartdispatcher.tearDown()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 1
b 0
f 0
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
from mock import patch
7
from subprocess import call
8
import subprocess
9
from nose.tools import assert_true, assert_equal
10
from smartdispatch import smartdispatch_script
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
20
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
21
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
22
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
23
        self.nb_commands = len(self.commands)
24
25
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
26
        self.smart_dispatch_command = '{} -C 1 -G 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
27
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
28
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
29
30
        self.smart_dispatch_launcher_command = '{} -C 1 -G 1 -q test -t 5:00'.format(pjoin(scripts_path, 'smart-dispatch'))
31
        self.launcher_command = "{0} launch {1}".format(self.smart_dispatch_launcher_command, self.folded_commands)
32
33
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -G 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
34
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
35
        self.nb_workers = 10
36
37
        smart_dispatch_command_with_cores = '{} -C 1 -G 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
38
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
39
40
        smart_dispatch_command_with_gpus = '{} -C 1 -G 1 -g {{gpus}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
41
        self.launch_command_with_gpus = smart_dispatch_command_with_gpus.format('launch ' + self.folded_commands, gpus='{gpus}')
42
43
        self._cwd = os.getcwd()
44
        os.chdir(self.testing_dir)
45
46
    def tearDown(self):
47
        os.chdir(self._cwd)
48
        shutil.rmtree(self.testing_dir)
49
50
    def test_main_launch(self):
51
        # Actual test
52
        exit_status = call(self.launch_command, shell=True)
53
54
        # Test validation
55
        assert_equal(exit_status, 0)
56
        assert_true(os.path.isdir(self.logs_dir))
57
        assert_equal(len(os.listdir(self.logs_dir)), 1)
58
59
        batch_uid = os.listdir(self.logs_dir)[0]
60
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
61
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
62
63
    def test_launch_using_commands_file(self):
64
        # Actual test
65
        commands_filename = "commands_to_run.txt"
66
        open(commands_filename, 'w').write("\n".join(self.commands))
67
68
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
69
        exit_status = call(launch_command, shell=True)
70
71
        # Test validation
72
        assert_equal(exit_status, 0)
73
        assert_true(os.path.isdir(self.logs_dir))
74
        assert_equal(len(os.listdir(self.logs_dir)), 1)
75
76
        batch_uid = os.listdir(self.logs_dir)[0]
77
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
78
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
79
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
80
81
    def test_main_launch_with_pool_of_workers(self):
82
        # Actual test
83
        exit_status = call(self.launch_command_with_pool, shell=True)
84
85
        # Test validation
86
        assert_equal(exit_status, 0)
87
        assert_true(os.path.isdir(self.logs_dir))
88
        assert_equal(len(os.listdir(self.logs_dir)), 1)
89
90
        batch_uid = os.listdir(self.logs_dir)[0]
91
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
92
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
93
94
    def test_main_launch_with_cores_command(self):
95
        # Actual test
96
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
97
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
98
99
        # Test validation
100
        assert_equal(exit_status_0, 2)
101
        assert_equal(exit_status_100, 2)        
102
        assert_true(os.path.isdir(self.logs_dir))
103
104
    def test_main_launch_with_gpus_command(self):
105
        # Actual test
106
        exit_status_100 = call(self.launch_command_with_gpus.format(gpus=100), shell=True)
107
108
        # Test validation
109
        assert_equal(exit_status_100, 2)
110
        assert_true(os.path.isdir(self.logs_dir))
111
112
    @patch('subprocess.check_output')
113
    def test_launch_job_check(self, mock_check_output):
114
115
        #For this test, we won't call the script directly, since we want to mock subprocess.check_output
116
        mock_check_output.side_effect = subprocess.CalledProcessError(1, 1, "A wild error appeared!")
117
        argv = ['-t', '0:0:1', '-G', '1', '-C', '1', '-q', 'random', 'launch', 'echo', 'testing123']
118
119
        try:
120
            with self.assertRaises(SystemExit) as context:
121
                smartdispatch_script.main(argv=argv)
122
123
                self.assertTrue(context.exception.code, 2)
124
125
        except subprocess.CalledProcessError:
126
            self.fail("smartdispatch_script.main() raised CalledProcessError unexpectedly!")
127
128
129
    def test_main_resume(self):
130
        # Setup
131
        call(self.launch_command, shell=True)
132
        batch_uid = os.listdir(self.logs_dir)[0]
133
134 View Code Duplication
        # Simulate that some commands are in the running state.
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
136
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
137
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
138
        commands = open(pending_commands_file).read().strip().split("\n")
139
        with open(running_commands_file, 'w') as running_commands:
140
            running_commands.write("\n".join(commands[::2]) + "\n")
141
        with open(pending_commands_file, 'w') as pending_commands:
142
            pending_commands.write("\n".join(commands[1::2]) + "\n")
143
144
        # Actual test (should move running commands back to pending).
145
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
146
147
        # Test validation
148
        assert_equal(exit_status, 0)
149
        assert_true(os.path.isdir(self.logs_dir))
150
        assert_equal(len(os.listdir(self.logs_dir)), 1)
151
        assert_equal(len(open(running_commands_file).readlines()), 0)
152
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
153
154
        # Test when batch_uid is a path instead of a jobname.
155
        # Setup
156
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
157
158
        # Simulate that some commands are in the running state.
159
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
160
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
161
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
162
        commands = open(pending_commands_file).read().strip().split("\n")
163
        with open(running_commands_file, 'w') as running_commands:
164
            running_commands.write("\n".join(commands[::2]) + "\n")
165
        with open(pending_commands_file, 'w') as pending_commands:
166
            pending_commands.write("\n".join(commands[1::2]) + "\n")
167
168 View Code Duplication
        # Actual test (should move running commands back to pending).
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
169
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
170
171
        # Test validation
172
        assert_equal(exit_status, 0)
173
        assert_true(os.path.isdir(self.logs_dir))
174
        assert_equal(len(os.listdir(self.logs_dir)), 1)
175
        assert_equal(len(open(running_commands_file).readlines()), 0)
176
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
177
178
    def test_main_resume_by_expanding_pool_default(self):
179
        # Create SMART_DISPATCH_LOGS structure.
180
        call(self.launch_command, shell=True)
181
        batch_uid = os.listdir(self.logs_dir)[0]
182
183
        # Simulate that some commands are in the running state.
184
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
185
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
186
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
187
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
188
        commands = open(pending_commands_file).read().strip().split("\n")
189
        with open(running_commands_file, 'w') as running_commands:
190
            running_commands.write("\n".join(commands[::2]) + "\n")
191
        with open(pending_commands_file, 'w') as pending_commands:
192
            pending_commands.write("\n".join(commands[1::2]) + "\n")
193
194
        # Remove PBS files so we can check that new ones are going to be created.
195
        for f in os.listdir(path_job_commands):
196
            if f.startswith('job_commands_') and f.endswith('.sh'):
197
                os.remove(pjoin(path_job_commands, f))
198
199
        # Should NOT move running commands back to pending but should add new workers.
200
        command_line = self.resume_command.format(batch_uid)
201
        command_line += " --expandPool"
202
        exit_status = call(command_line, shell=True)
203
204
        # Test validation
205
        assert_equal(exit_status, 0)
206
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
207
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
208
209
        nb_job_commands_files = len(os.listdir(path_job_commands))
210
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
211
212
    def test_main_resume_by_expanding_pool(self):
213
        # Create SMART_DISPATCH_LOGS structure.
214
        call(self.launch_command, shell=True)
215
        batch_uid = os.listdir(self.logs_dir)[0]
216
217
        # Simulate that some commands are in the running state.
218
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
219
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
220
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
221
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
222
        commands = open(pending_commands_file).read().strip().split("\n")
223
        with open(running_commands_file, 'w') as running_commands:
224
            running_commands.write("\n".join(commands[::2]) + "\n")
225
        with open(pending_commands_file, 'w') as pending_commands:
226
            pending_commands.write("\n".join(commands[1::2]) + "\n")
227
228
        # Remove PBS files so we can check that new ones are going to be created.
229
        for f in os.listdir(path_job_commands):
230
            if f.startswith('job_commands_') and f.endswith('.sh'):
231
                os.remove(pjoin(path_job_commands, f))
232
233
        # Should NOT move running commands back to pending but should add new workers.
234
        nb_workers_to_add = 3
235
        command_line = self.resume_command.format(batch_uid)
236
        command_line += " --expandPool {}".format(nb_workers_to_add)
237
        exit_status = call(command_line, shell=True)
238
239
        # Test validation
240
        assert_equal(exit_status, 0)
241
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
242
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
243
244
        nb_job_commands_files = len(os.listdir(path_job_commands))
245
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
246