Completed
Pull Request — master (#153)
by
unknown
57s
created

test_launch_using_commands_file()   A

Complexity

Conditions 1

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 17
rs 9.4285
c 1
b 0
f 0
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
7
from subprocess import call
8
9
from nose.tools import assert_true, assert_equal
10
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
20
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
21
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
22
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
23
        self.nb_commands = len(self.commands)
24
25
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
26
        self.smart_dispatch_command = '{} -C 1 -M 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
27
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
28
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
29
30
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -M 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
31
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
32
        self.nb_workers = 10
33
34
        smart_dispatch_command_with_cores = '{} -C 1 -M 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
35
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
36
37
        smart_dispatch_command_with_memory = '{} -C 1 -M 1 -m {{memory}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
38
        self.launch_command_with_memory = smart_dispatch_command_with_memory.format('launch ' + self.folded_commands, memory='{memory}')
39
40
        self._cwd = os.getcwd()
41
        os.chdir(self.testing_dir)
42
43
    def tearDown(self):
44
        os.chdir(self._cwd)
45
        shutil.rmtree(self.testing_dir)
46
47
    def test_main_launch(self):
48
        # Actual test
49
        exit_status = call(self.launch_command, shell=True)
50
51
        # Test validation
52
        assert_equal(exit_status, 0)
53
        assert_true(os.path.isdir(self.logs_dir))
54
        assert_equal(len(os.listdir(self.logs_dir)), 1)
55
56
        batch_uid = os.listdir(self.logs_dir)[0]
57
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
58
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
59
60
    def test_launch_using_commands_file(self):
61
        # Actual test
62
        commands_filename = "commands_to_run.txt"
63
        open(commands_filename, 'w').write("\n".join(self.commands))
64
65
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
66
        exit_status = call(launch_command, shell=True)
67
68
        # Test validation
69
        assert_equal(exit_status, 0)
70
        assert_true(os.path.isdir(self.logs_dir))
71
        assert_equal(len(os.listdir(self.logs_dir)), 1)
72
73
        batch_uid = os.listdir(self.logs_dir)[0]
74
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
75
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
76
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
77
78
    def test_main_launch_with_pool_of_workers(self):
79
        # Actual test
80
        exit_status = call(self.launch_command_with_pool, shell=True)
81
82
        # Test validation
83
        assert_equal(exit_status, 0)
84
        assert_true(os.path.isdir(self.logs_dir))
85
        assert_equal(len(os.listdir(self.logs_dir)), 1)
86
87
        batch_uid = os.listdir(self.logs_dir)[0]
88
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
89
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
90
91
    def test_main_launch_with_cores_command(self):
92
        # Actual test
93
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
94
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
95
96
        # Test validation
97
        assert_equal(exit_status_0, 2)
98
        assert_equal(exit_status_100, 2)        
99
        assert_true(os.path.isdir(self.logs_dir))
100
101
    def test_main_launch_with_memory_command(self):
102
        # Actual test
103
        exit_status_0 = call(self.launch_command_with_memory.format(memory=0), shell=True)
104
        exit_status_05 = call(self.launch_command_with_memory.format(memory=0.5), shell=True)
105
        exit_status_100 = call(self.launch_command_with_memory.format(memory=100), shell=True)
106
107
        # Test validation
108
        assert_equal(exit_status_0, 2)
109
        assert_equal(exit_status_05, 0)
110
        assert_equal(exit_status_100, 2)        
111
        assert_true(os.path.isdir(self.logs_dir))
112
113
    def test_main_resume(self):
114
        # Setup
115
        call(self.launch_command, shell=True)
116
        batch_uid = os.listdir(self.logs_dir)[0]
117
118
        # Simulate that some commands are in the running state.
119
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
120
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
121
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
122
        commands = open(pending_commands_file).read().strip().split("\n")
123
        with open(running_commands_file, 'w') as running_commands:
124
            running_commands.write("\n".join(commands[::2]) + "\n")
125
        with open(pending_commands_file, 'w') as pending_commands:
126
            pending_commands.write("\n".join(commands[1::2]) + "\n")
127
128
        # Actual test (should move running commands back to pending).
129
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
130
131
        # Test validation
132
        assert_equal(exit_status, 0)
133
        assert_true(os.path.isdir(self.logs_dir))
134 View Code Duplication
        assert_equal(len(os.listdir(self.logs_dir)), 1)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
        assert_equal(len(open(running_commands_file).readlines()), 0)
136
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
137
138
        # Test when batch_uid is a path instead of a jobname.
139
        # Setup
140
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
141
142
        # Simulate that some commands are in the running state.
143
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
144
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
145
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
146
        commands = open(pending_commands_file).read().strip().split("\n")
147
        with open(running_commands_file, 'w') as running_commands:
148
            running_commands.write("\n".join(commands[::2]) + "\n")
149
        with open(pending_commands_file, 'w') as pending_commands:
150
            pending_commands.write("\n".join(commands[1::2]) + "\n")
151
152
        # Actual test (should move running commands back to pending).
153
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
154
155
        # Test validation
156
        assert_equal(exit_status, 0)
157
        assert_true(os.path.isdir(self.logs_dir))
158
        assert_equal(len(os.listdir(self.logs_dir)), 1)
159
        assert_equal(len(open(running_commands_file).readlines()), 0)
160
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
161
162
    def test_main_resume_by_expanding_pool_default(self):
163
        # Create SMART_DISPATCH_LOGS structure.
164
        call(self.launch_command, shell=True)
165
        batch_uid = os.listdir(self.logs_dir)[0]
166
167
        # Simulate that some commands are in the running state.
168 View Code Duplication
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
169
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
170
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
171
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
172
        commands = open(pending_commands_file).read().strip().split("\n")
173
        with open(running_commands_file, 'w') as running_commands:
174
            running_commands.write("\n".join(commands[::2]) + "\n")
175
        with open(pending_commands_file, 'w') as pending_commands:
176
            pending_commands.write("\n".join(commands[1::2]) + "\n")
177
178
        # Remove PBS files so we can check that new ones are going to be created.
179
        for f in os.listdir(path_job_commands):
180
            if f.startswith('job_commands_') and f.endswith('.sh'):
181
                os.remove(pjoin(path_job_commands, f))
182
183
        # Should NOT move running commands back to pending but should add new workers.
184
        command_line = self.resume_command.format(batch_uid)
185
        command_line += " --expandPool"
186
        exit_status = call(command_line, shell=True)
187
188
        # Test validation
189
        assert_equal(exit_status, 0)
190
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
191
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
192
193
        nb_job_commands_files = len(os.listdir(path_job_commands))
194
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
195
196
    def test_main_resume_by_expanding_pool(self):
197
        # Create SMART_DISPATCH_LOGS structure.
198
        call(self.launch_command, shell=True)
199
        batch_uid = os.listdir(self.logs_dir)[0]
200
201
        # Simulate that some commands are in the running state.
202
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
203
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
204
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
205
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
206
        commands = open(pending_commands_file).read().strip().split("\n")
207
        with open(running_commands_file, 'w') as running_commands:
208
            running_commands.write("\n".join(commands[::2]) + "\n")
209
        with open(pending_commands_file, 'w') as pending_commands:
210
            pending_commands.write("\n".join(commands[1::2]) + "\n")
211
212
        # Remove PBS files so we can check that new ones are going to be created.
213
        for f in os.listdir(path_job_commands):
214
            if f.startswith('job_commands_') and f.endswith('.sh'):
215
                os.remove(pjoin(path_job_commands, f))
216
217
        # Should NOT move running commands back to pending but should add new workers.
218
        nb_workers_to_add = 3
219
        command_line = self.resume_command.format(batch_uid)
220
        command_line += " --expandPool {}".format(nb_workers_to_add)
221
        exit_status = call(command_line, shell=True)
222
223
        # Test validation
224
        assert_equal(exit_status, 0)
225
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
226
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
227
228
        nb_job_commands_files = len(os.listdir(path_job_commands))
229
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
230