1 | import os |
||
2 | import unittest |
||
3 | import tempfile |
||
4 | import shutil |
||
5 | from os.path import join as pjoin, abspath |
||
6 | |||
7 | from subprocess import call |
||
8 | |||
9 | from nose.tools import assert_true, assert_equal |
||
10 | |||
11 | |||
12 | class TestSmartdispatcher(unittest.TestCase): |
||
13 | |||
14 | def setUp(self): |
||
15 | self.testing_dir = tempfile.mkdtemp() |
||
16 | self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS') |
||
17 | |||
18 | self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"' |
||
19 | self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0", |
||
20 | "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0", |
||
21 | "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0", |
||
22 | "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"] |
||
23 | self.nb_commands = len(self.commands) |
||
24 | |||
25 | scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts")) |
||
26 | self.smart_dispatch_command = '{} -C 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch')) |
||
27 | self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands) |
||
28 | self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command) |
||
29 | |||
30 | smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) |
||
31 | self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands) |
||
32 | self.nb_workers = 10 |
||
33 | |||
34 | smart_dispatch_command_with_cores = '{} -C 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) |
||
35 | self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}') |
||
36 | |||
37 | self._cwd = os.getcwd() |
||
38 | os.chdir(self.testing_dir) |
||
39 | |||
40 | def tearDown(self): |
||
41 | os.chdir(self._cwd) |
||
42 | shutil.rmtree(self.testing_dir) |
||
43 | |||
44 | def test_main_launch(self): |
||
45 | # Actual test |
||
46 | exit_status = call(self.launch_command, shell=True) |
||
47 | |||
48 | # Test validation |
||
49 | assert_equal(exit_status, 0) |
||
50 | assert_true(os.path.isdir(self.logs_dir)) |
||
51 | assert_equal(len(os.listdir(self.logs_dir)), 1) |
||
52 | |||
53 | batch_uid = os.listdir(self.logs_dir)[0] |
||
54 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
55 | assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1) |
||
56 | |||
57 | def test_launch_using_commands_file(self): |
||
58 | # Actual test |
||
59 | commands_filename = "commands_to_run.txt" |
||
60 | open(commands_filename, 'w').write("\n".join(self.commands)) |
||
61 | |||
62 | launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename) |
||
63 | exit_status = call(launch_command, shell=True) |
||
64 | |||
65 | # Test validation |
||
66 | assert_equal(exit_status, 0) |
||
67 | assert_true(os.path.isdir(self.logs_dir)) |
||
68 | assert_equal(len(os.listdir(self.logs_dir)), 1) |
||
69 | |||
70 | batch_uid = os.listdir(self.logs_dir)[0] |
||
71 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
72 | assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1) |
||
73 | assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n") |
||
74 | |||
75 | def test_main_launch_with_pool_of_workers(self): |
||
76 | # Actual test |
||
77 | exit_status = call(self.launch_command_with_pool, shell=True) |
||
78 | |||
79 | # Test validation |
||
80 | assert_equal(exit_status, 0) |
||
81 | assert_true(os.path.isdir(self.logs_dir)) |
||
82 | assert_equal(len(os.listdir(self.logs_dir)), 1) |
||
83 | |||
84 | batch_uid = os.listdir(self.logs_dir)[0] |
||
85 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
86 | assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1) |
||
87 | |||
88 | def test_main_launch_with_cores_command(self): |
||
89 | # Actual test |
||
90 | exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True) |
||
91 | exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True) |
||
92 | |||
93 | # Test validation |
||
94 | assert_equal(exit_status_0, 2) |
||
95 | assert_equal(exit_status_100, 2) |
||
96 | assert_true(os.path.isdir(self.logs_dir)) |
||
97 | |||
98 | def test_main_resume(self): |
||
99 | # Setup |
||
100 | call(self.launch_command, shell=True) |
||
101 | batch_uid = os.listdir(self.logs_dir)[0] |
||
102 | |||
103 | # Simulate that some commands are in the running state. |
||
104 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
105 | pending_commands_file = pjoin(path_job_commands, "commands.txt") |
||
106 | running_commands_file = pjoin(path_job_commands, "running_commands.txt") |
||
107 | commands = open(pending_commands_file).read().strip().split("\n") |
||
108 | with open(running_commands_file, 'w') as running_commands: |
||
109 | running_commands.write("\n".join(commands[::2]) + "\n") |
||
110 | with open(pending_commands_file, 'w') as pending_commands: |
||
111 | pending_commands.write("\n".join(commands[1::2]) + "\n") |
||
112 | |||
113 | # Actual test (should move running commands back to pending). |
||
114 | exit_status = call(self.resume_command.format(batch_uid), shell=True) |
||
115 | |||
116 | # Test validation |
||
117 | assert_equal(exit_status, 0) |
||
118 | assert_true(os.path.isdir(self.logs_dir)) |
||
119 | assert_equal(len(os.listdir(self.logs_dir)), 1) |
||
120 | assert_equal(len(open(running_commands_file).readlines()), 0) |
||
121 | assert_equal(len(open(pending_commands_file).readlines()), len(commands)) |
||
122 | |||
123 | # Test when batch_uid is a path instead of a jobname. |
||
124 | # Setup |
||
125 | batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0]) |
||
126 | |||
127 | # Simulate that some commands are in the running state. |
||
128 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
129 | pending_commands_file = pjoin(path_job_commands, "commands.txt") |
||
130 | running_commands_file = pjoin(path_job_commands, "running_commands.txt") |
||
131 | commands = open(pending_commands_file).read().strip().split("\n") |
||
132 | with open(running_commands_file, 'w') as running_commands: |
||
133 | running_commands.write("\n".join(commands[::2]) + "\n") |
||
134 | View Code Duplication | with open(pending_commands_file, 'w') as pending_commands: |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
135 | pending_commands.write("\n".join(commands[1::2]) + "\n") |
||
136 | |||
137 | # Actual test (should move running commands back to pending). |
||
138 | exit_status = call(self.resume_command.format(batch_uid), shell=True) |
||
139 | |||
140 | # Test validation |
||
141 | assert_equal(exit_status, 0) |
||
142 | assert_true(os.path.isdir(self.logs_dir)) |
||
143 | assert_equal(len(os.listdir(self.logs_dir)), 1) |
||
144 | assert_equal(len(open(running_commands_file).readlines()), 0) |
||
145 | assert_equal(len(open(pending_commands_file).readlines()), len(commands)) |
||
146 | |||
147 | def test_main_resume_by_expanding_pool_default(self): |
||
148 | # Create SMART_DISPATCH_LOGS structure. |
||
149 | call(self.launch_command, shell=True) |
||
150 | batch_uid = os.listdir(self.logs_dir)[0] |
||
151 | |||
152 | # Simulate that some commands are in the running state. |
||
153 | nb_commands_files = 2 # 'commands.txt' and 'running_commands.txt' |
||
154 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
155 | pending_commands_file = pjoin(path_job_commands, "commands.txt") |
||
156 | running_commands_file = pjoin(path_job_commands, "running_commands.txt") |
||
157 | commands = open(pending_commands_file).read().strip().split("\n") |
||
158 | with open(running_commands_file, 'w') as running_commands: |
||
159 | running_commands.write("\n".join(commands[::2]) + "\n") |
||
160 | with open(pending_commands_file, 'w') as pending_commands: |
||
161 | pending_commands.write("\n".join(commands[1::2]) + "\n") |
||
162 | |||
163 | # Remove PBS files so we can check that new ones are going to be created. |
||
164 | for f in os.listdir(path_job_commands): |
||
165 | if f.startswith('job_commands_') and f.endswith('.sh'): |
||
166 | os.remove(pjoin(path_job_commands, f)) |
||
167 | |||
168 | View Code Duplication | # Should NOT move running commands back to pending but should add new workers. |
|
0 ignored issues
–
show
|
|||
169 | command_line = self.resume_command.format(batch_uid) |
||
170 | command_line += " --expandPool" |
||
171 | exit_status = call(command_line, shell=True) |
||
172 | |||
173 | # Test validation |
||
174 | assert_equal(exit_status, 0) |
||
175 | assert_equal(len(open(running_commands_file).readlines()), len(commands[::2])) |
||
176 | assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2])) |
||
177 | |||
178 | nb_job_commands_files = len(os.listdir(path_job_commands)) |
||
179 | assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2])) |
||
180 | |||
181 | def test_main_resume_by_expanding_pool(self): |
||
182 | # Create SMART_DISPATCH_LOGS structure. |
||
183 | call(self.launch_command, shell=True) |
||
184 | batch_uid = os.listdir(self.logs_dir)[0] |
||
185 | |||
186 | # Simulate that some commands are in the running state. |
||
187 | nb_commands_files = 2 # 'commands.txt' and 'running_commands.txt' |
||
188 | path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands") |
||
189 | pending_commands_file = pjoin(path_job_commands, "commands.txt") |
||
190 | running_commands_file = pjoin(path_job_commands, "running_commands.txt") |
||
191 | commands = open(pending_commands_file).read().strip().split("\n") |
||
192 | with open(running_commands_file, 'w') as running_commands: |
||
193 | running_commands.write("\n".join(commands[::2]) + "\n") |
||
194 | with open(pending_commands_file, 'w') as pending_commands: |
||
195 | pending_commands.write("\n".join(commands[1::2]) + "\n") |
||
196 | |||
197 | # Remove PBS files so we can check that new ones are going to be created. |
||
198 | for f in os.listdir(path_job_commands): |
||
199 | if f.startswith('job_commands_') and f.endswith('.sh'): |
||
200 | os.remove(pjoin(path_job_commands, f)) |
||
201 | |||
202 | # Should NOT move running commands back to pending but should add new workers. |
||
203 | nb_workers_to_add = 3 |
||
204 | command_line = self.resume_command.format(batch_uid) |
||
205 | command_line += " --expandPool {}".format(nb_workers_to_add) |
||
206 | exit_status = call(command_line, shell=True) |
||
207 | |||
208 | # Test validation |
||
209 | assert_equal(exit_status, 0) |
||
210 | assert_equal(len(open(running_commands_file).readlines()), len(commands[::2])) |
||
211 | assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2])) |
||
212 | |||
213 | nb_job_commands_files = len(os.listdir(path_job_commands)) |
||
214 | assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add) |
||
215 |