Completed
Push — master ( 4e38f3...9b63d4 )
by Mathieu
10s
created

TestSmartdispatcher   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 203
Duplicated Lines 33 %

Importance

Changes 13
Bugs 0 Features 1
Metric Value
c 13
b 0
f 1
dl 67
loc 203
rs 10
wmc 23

9 Methods

Rating   Name   Duplication   Size   Complexity  
A test_main_launch_with_cores_command() 0 9 1
A test_main_launch_with_pool_of_workers() 0 12 1
A tearDown() 0 3 1
B test_main_resume() 12 48 5
A test_main_launch() 0 12 1
B setUp() 0 25 1
B test_main_resume_by_expanding_pool_default() 32 33 6
B test_main_resume_by_expanding_pool() 21 34 6
A test_launch_using_commands_file() 0 17 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
7
from subprocess import call
8
9
from nose.tools import assert_true, assert_equal
10
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
20
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
21
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
22
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
23
        self.nb_commands = len(self.commands)
24
25
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
26
        self.smart_dispatch_command = '{} -C 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
27
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
28
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
29
30
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
31
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
32
        self.nb_workers = 10
33
34
        smart_dispatch_command_with_cores = '{} -C 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
35
        self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')
36
37
        self._cwd = os.getcwd()
38
        os.chdir(self.testing_dir)
39
40
    def tearDown(self):
41
        os.chdir(self._cwd)
42
        shutil.rmtree(self.testing_dir)
43
44
    def test_main_launch(self):
45
        # Actual test
46
        exit_status = call(self.launch_command, shell=True)
47
48
        # Test validation
49
        assert_equal(exit_status, 0)
50
        assert_true(os.path.isdir(self.logs_dir))
51
        assert_equal(len(os.listdir(self.logs_dir)), 1)
52
53
        batch_uid = os.listdir(self.logs_dir)[0]
54
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
55
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
56
57
    def test_launch_using_commands_file(self):
58
        # Actual test
59
        commands_filename = "commands_to_run.txt"
60
        open(commands_filename, 'w').write("\n".join(self.commands))
61
62
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
63
        exit_status = call(launch_command, shell=True)
64
65
        # Test validation
66
        assert_equal(exit_status, 0)
67
        assert_true(os.path.isdir(self.logs_dir))
68
        assert_equal(len(os.listdir(self.logs_dir)), 1)
69
70
        batch_uid = os.listdir(self.logs_dir)[0]
71
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
72
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
73
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
74
75
    def test_main_launch_with_pool_of_workers(self):
76
        # Actual test
77
        exit_status = call(self.launch_command_with_pool, shell=True)
78
79
        # Test validation
80
        assert_equal(exit_status, 0)
81
        assert_true(os.path.isdir(self.logs_dir))
82
        assert_equal(len(os.listdir(self.logs_dir)), 1)
83
84
        batch_uid = os.listdir(self.logs_dir)[0]
85
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
86
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
87
88
    def test_main_launch_with_cores_command(self):
89
        # Actual test
90
        exit_status_0 = call(self.launch_command_with_cores.format(cores=0), shell=True)
91
        exit_status_100 = call(self.launch_command_with_cores.format(cores=100), shell=True)
92
93
        # Test validation
94
        assert_equal(exit_status_0, 2)
95
        assert_equal(exit_status_100, 2)        
96
        assert_true(os.path.isdir(self.logs_dir))
97
98
    def test_main_resume(self):
99
        # Setup
100
        call(self.launch_command, shell=True)
101
        batch_uid = os.listdir(self.logs_dir)[0]
102
103
        # Simulate that some commands are in the running state.
104
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
105
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
106
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
107
        commands = open(pending_commands_file).read().strip().split("\n")
108
        with open(running_commands_file, 'w') as running_commands:
109
            running_commands.write("\n".join(commands[::2]) + "\n")
110
        with open(pending_commands_file, 'w') as pending_commands:
111
            pending_commands.write("\n".join(commands[1::2]) + "\n")
112
113
        # Actual test (should move running commands back to pending).
114
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
115
116
        # Test validation
117
        assert_equal(exit_status, 0)
118
        assert_true(os.path.isdir(self.logs_dir))
119
        assert_equal(len(os.listdir(self.logs_dir)), 1)
120
        assert_equal(len(open(running_commands_file).readlines()), 0)
121
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
122
123
        # Test when batch_uid is a path instead of a jobname.
124
        # Setup
125
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
126
127
        # Simulate that some commands are in the running state.
128
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
129
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
130
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
131
        commands = open(pending_commands_file).read().strip().split("\n")
132
        with open(running_commands_file, 'w') as running_commands:
133
            running_commands.write("\n".join(commands[::2]) + "\n")
134 View Code Duplication
        with open(pending_commands_file, 'w') as pending_commands:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
            pending_commands.write("\n".join(commands[1::2]) + "\n")
136
137
        # Actual test (should move running commands back to pending).
138
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
139
140
        # Test validation
141
        assert_equal(exit_status, 0)
142
        assert_true(os.path.isdir(self.logs_dir))
143
        assert_equal(len(os.listdir(self.logs_dir)), 1)
144
        assert_equal(len(open(running_commands_file).readlines()), 0)
145
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
146
147
    def test_main_resume_by_expanding_pool_default(self):
148
        # Create SMART_DISPATCH_LOGS structure.
149
        call(self.launch_command, shell=True)
150
        batch_uid = os.listdir(self.logs_dir)[0]
151
152
        # Simulate that some commands are in the running state.
153
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
154
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
155
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
156
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
157
        commands = open(pending_commands_file).read().strip().split("\n")
158
        with open(running_commands_file, 'w') as running_commands:
159
            running_commands.write("\n".join(commands[::2]) + "\n")
160
        with open(pending_commands_file, 'w') as pending_commands:
161
            pending_commands.write("\n".join(commands[1::2]) + "\n")
162
163
        # Remove PBS files so we can check that new ones are going to be created.
164
        for f in os.listdir(path_job_commands):
165
            if f.startswith('job_commands_') and f.endswith('.sh'):
166
                os.remove(pjoin(path_job_commands, f))
167
168 View Code Duplication
        # Should NOT move running commands back to pending but should add new workers.
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
169
        command_line = self.resume_command.format(batch_uid)
170
        command_line += " --expandPool"
171
        exit_status = call(command_line, shell=True)
172
173
        # Test validation
174
        assert_equal(exit_status, 0)
175
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
176
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
177
178
        nb_job_commands_files = len(os.listdir(path_job_commands))
179
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
180
181
    def test_main_resume_by_expanding_pool(self):
182
        # Create SMART_DISPATCH_LOGS structure.
183
        call(self.launch_command, shell=True)
184
        batch_uid = os.listdir(self.logs_dir)[0]
185
186
        # Simulate that some commands are in the running state.
187
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
188
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
189
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
190
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
191
        commands = open(pending_commands_file).read().strip().split("\n")
192
        with open(running_commands_file, 'w') as running_commands:
193
            running_commands.write("\n".join(commands[::2]) + "\n")
194
        with open(pending_commands_file, 'w') as pending_commands:
195
            pending_commands.write("\n".join(commands[1::2]) + "\n")
196
197
        # Remove PBS files so we can check that new ones are going to be created.
198
        for f in os.listdir(path_job_commands):
199
            if f.startswith('job_commands_') and f.endswith('.sh'):
200
                os.remove(pjoin(path_job_commands, f))
201
202
        # Should NOT move running commands back to pending but should add new workers.
203
        nb_workers_to_add = 3
204
        command_line = self.resume_command.format(batch_uid)
205
        command_line += " --expandPool {}".format(nb_workers_to_add)
206
        exit_status = call(command_line, shell=True)
207
208
        # Test validation
209
        assert_equal(exit_status, 0)
210
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
211
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
212
213
        nb_job_commands_files = len(os.listdir(path_job_commands))
214
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
215