Code Duplication    Length = 33-34 lines in 2 locations

tests/test_smart_dispatch.py 2 locations

@@ 168-201 (lines=34) @@
165
        batch_uid = os.listdir(self.logs_dir)[0]
166
167
        # Simulate that some commands are in the running state.
168
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
169
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
170
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
171
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
172
        commands = open(pending_commands_file).read().strip().split("\n")
173
        with open(running_commands_file, 'w') as running_commands:
174
            running_commands.write("\n".join(commands[::2]) + "\n")
175
        with open(pending_commands_file, 'w') as pending_commands:
176
            pending_commands.write("\n".join(commands[1::2]) + "\n")
177
178
        # Remove PBS files so we can check that new ones are going to be created.
179
        for f in os.listdir(path_job_commands):
180
            if f.startswith('job_commands_') and f.endswith('.sh'):
181
                os.remove(pjoin(path_job_commands, f))
182
183
        # Should NOT move running commands back to pending but should add new workers.
184
        command_line = self.resume_command.format(batch_uid)
185
        command_line += " --expandPool"
186
        exit_status = call(command_line, shell=True)
187
188
        # Test validation
189
        assert_equal(exit_status, 0)
190
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
191
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
192
193
        nb_job_commands_files = len(os.listdir(path_job_commands))
194
        assert_equal(nb_job_commands_files-nb_commands_files, len(commands[1::2]))
195
196
    def test_main_resume_by_expanding_pool(self):
197
        # Create SMART_DISPATCH_LOGS structure.
198
        call(self.launch_command, shell=True)
199
        batch_uid = os.listdir(self.logs_dir)[0]
200
201
        # Simulate that some commands are in the running state.
202
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
203
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
204
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
@@ 134-166 (lines=33) @@
131
        # Test validation
132
        assert_equal(exit_status, 0)
133
        assert_true(os.path.isdir(self.logs_dir))
134
        assert_equal(len(os.listdir(self.logs_dir)), 1)
135
        assert_equal(len(open(running_commands_file).readlines()), 0)
136
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
137
138
        # Test when batch_uid is a path instead of a jobname.
139
        # Setup
140
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
141
142
        # Simulate that some commands are in the running state.
143
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
144
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
145
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
146
        commands = open(pending_commands_file).read().strip().split("\n")
147
        with open(running_commands_file, 'w') as running_commands:
148
            running_commands.write("\n".join(commands[::2]) + "\n")
149
        with open(pending_commands_file, 'w') as pending_commands:
150
            pending_commands.write("\n".join(commands[1::2]) + "\n")
151
152
        # Actual test (should move running commands back to pending).
153
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
154
155
        # Test validation
156
        assert_equal(exit_status, 0)
157
        assert_true(os.path.isdir(self.logs_dir))
158
        assert_equal(len(os.listdir(self.logs_dir)), 1)
159
        assert_equal(len(open(running_commands_file).readlines()), 0)
160
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
161
162
    def test_main_resume_by_expanding_pool_default(self):
163
        # Create SMART_DISPATCH_LOGS structure.
164
        call(self.launch_command, shell=True)
165
        batch_uid = os.listdir(self.logs_dir)[0]
166
167
        # Simulate that some commands are in the running state.
168
        nb_commands_files = 2  # 'commands.txt' and 'running_commands.txt'
169
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")