Completed
Push — master ( d395f4...15cd23 )
by Mathieu
8s
created

TestSmartdispatcher   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 190
Duplicated Lines 35.26 %
Metric Value
dl 67
loc 190
rs 10
wmc 22

8 Methods

Rating   Name   Duplication   Size   Complexity  
A test_main_launch_with_pool_of_workers() 0 12 1
A tearDown() 0 3 1
B test_main_resume() 0 48 5
A test_main_launch() 0 12 1
A setUp() 0 22 1
A test_launch_using_commands_file() 0 17 1
B test_main_resume_by_expanding_pool_default() 33 33 6
B test_main_resume_by_expanding_pool() 34 34 6

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin, abspath
6
7
from subprocess import call
8
9
from nose.tools import assert_true, assert_equal
10
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
20
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
21
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
22
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
23
        self.nb_commands = len(self.commands)
24
25
        scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
26
        self.smart_dispatch_command = '{} -C 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
27
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
28
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
29
30
        smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
31
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
32
        self.nb_workers = 10
33
34
        self._cwd = os.getcwd()
35
        os.chdir(self.testing_dir)
36
37
    def tearDown(self):
38
        os.chdir(self._cwd)
39
        shutil.rmtree(self.testing_dir)
40
41
    def test_main_launch(self):
42
        # Actual test
43
        exit_status = call(self.launch_command, shell=True)
44
45
        # Test validation
46
        assert_equal(exit_status, 0)
47
        assert_true(os.path.isdir(self.logs_dir))
48
        assert_equal(len(os.listdir(self.logs_dir)), 1)
49
50
        batch_uid = os.listdir(self.logs_dir)[0]
51
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
52
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
53
54
    def test_launch_using_commands_file(self):
55
        # Actual test
56
        commands_filename = "commands_to_run.txt"
57
        open(commands_filename, 'w').write("\n".join(self.commands))
58
59
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
60
        exit_status = call(launch_command, shell=True)
61
62
        # Test validation
63
        assert_equal(exit_status, 0)
64
        assert_true(os.path.isdir(self.logs_dir))
65
        assert_equal(len(os.listdir(self.logs_dir)), 1)
66
67
        batch_uid = os.listdir(self.logs_dir)[0]
68
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
69
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
70
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
71
72
    def test_main_launch_with_pool_of_workers(self):
73
        # Actual test
74
        exit_status = call(self.launch_command_with_pool, shell=True)
75
76
        # Test validation
77
        assert_equal(exit_status, 0)
78
        assert_true(os.path.isdir(self.logs_dir))
79
        assert_equal(len(os.listdir(self.logs_dir)), 1)
80
81
        batch_uid = os.listdir(self.logs_dir)[0]
82
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
83
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
84
85
    def test_main_resume(self):
86
        # Setup
87
        call(self.launch_command, shell=True)
88
        batch_uid = os.listdir(self.logs_dir)[0]
89
90
        # Simulate that some commands are in the running state.
91
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
92
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
93
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
94
        commands = open(pending_commands_file).read().strip().split("\n")
95
        with open(running_commands_file, 'w') as running_commands:
96
            running_commands.write("\n".join(commands[::2]) + "\n")
97
        with open(pending_commands_file, 'w') as pending_commands:
98
            pending_commands.write("\n".join(commands[1::2]) + "\n")
99
100
        # Actual test (should move running commands back to pending).
101
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
102
103
        # Test validation
104
        assert_equal(exit_status, 0)
105
        assert_true(os.path.isdir(self.logs_dir))
106
        assert_equal(len(os.listdir(self.logs_dir)), 1)
107
        assert_equal(len(open(running_commands_file).readlines()), 0)
108
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
109
110
        # Test when batch_uid is a path instead of a jobname.
111
        # Setup
112
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
113
114
        # Simulate that some commands are in the running state.
115
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
116
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
117
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
118
        commands = open(pending_commands_file).read().strip().split("\n")
119
        with open(running_commands_file, 'w') as running_commands:
120
            running_commands.write("\n".join(commands[::2]) + "\n")
121
        with open(pending_commands_file, 'w') as pending_commands:
122
            pending_commands.write("\n".join(commands[1::2]) + "\n")
123
124
        # Actual test (should move running commands back to pending).
125
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
126
127
        # Test validation
128
        assert_equal(exit_status, 0)
129
        assert_true(os.path.isdir(self.logs_dir))
130
        assert_equal(len(os.listdir(self.logs_dir)), 1)
131
        assert_equal(len(open(running_commands_file).readlines()), 0)
132
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
133
134 View Code Duplication
    def test_main_resume_by_expanding_pool_default(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
        # Create SMART_DISPATCH_LOGS structure.
136
        call(self.launch_command, shell=True)
137
        batch_uid = os.listdir(self.logs_dir)[0]
138
139
        # Simulate that some commands are in the running state.
140
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
141
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
142
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
143
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
144
        commands = open(pending_commands_file).read().strip().split("\n")
145
        with open(running_commands_file, 'w') as running_commands:
146
            running_commands.write("\n".join(commands[::2]) + "\n")
147
        with open(pending_commands_file, 'w') as pending_commands:
148
            pending_commands.write("\n".join(commands[1::2]) + "\n")
149
150
        # Remove PBS files so we can check that new ones are going to be created.
151
        for f in os.listdir(path_job_commands):
152
            if f.startswith('job_commands_') and f.endswith('.sh'):
153
                os.remove(pjoin(path_job_commands, f))
154
155
        # Should NOT move running commands back to pending but should add new workers.
156
        command_line = self.resume_command.format(batch_uid)
157
        command_line += " --expandPool"
158
        exit_status = call(command_line, shell=True)
159
160
        # Test validation
161
        assert_equal(exit_status, 0)
162
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
163
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
164
165
        nb_job_commands_files = len(os.listdir(path_job_commands))
166
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
167
168 View Code Duplication
    def test_main_resume_by_expanding_pool(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
169
        # Create SMART_DISPATCH_LOGS structure.
170
        call(self.launch_command, shell=True)
171
        batch_uid = os.listdir(self.logs_dir)[0]
172
173
        # Simulate that some commands are in the running state.
174
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
175
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
176
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
177
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
178
        commands = open(pending_commands_file).read().strip().split("\n")
179
        with open(running_commands_file, 'w') as running_commands:
180
            running_commands.write("\n".join(commands[::2]) + "\n")
181
        with open(pending_commands_file, 'w') as pending_commands:
182
            pending_commands.write("\n".join(commands[1::2]) + "\n")
183
184
        # Remove PBS files so we can check that new ones are going to be created.
185
        for f in os.listdir(path_job_commands):
186
            if f.startswith('job_commands_') and f.endswith('.sh'):
187
                os.remove(pjoin(path_job_commands, f))
188
189
        # Should NOT move running commands back to pending but should add new workers.
190
        nb_workers_to_add = 3
191
        command_line = self.resume_command.format(batch_uid)
192
        command_line += " --expandPool {}".format(nb_workers_to_add)
193
        exit_status = call(command_line, shell=True)
194
195
        # Test validation
196
        assert_equal(exit_status, 0)
197
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
198
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
199
200
        nb_job_commands_files = len(os.listdir(path_job_commands))
201
        assert_equal(nb_job_commands_files-nb_commands_files, nb_workers_to_add)
202