Completed
Push — master ( 0b9edb...ab2b1a )
by Mathieu
10s
created

CommandManager   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 68
Duplicated Lines 0 %
Metric Value
dl 0
loc 68
rs 10
wmc 23

8 Methods

Rating   Name   Duplication   Size   Complexity  
A get_nb_commands_to_run() 0 3 2
A _move_line_between_files() 0 10 1
A get_command_to_run() 0 8 4
A get_failed_commands() 0 6 3
B reset_running_commands() 0 12 5
A __init__() 0 7 1
A set_running_command_as_finished() 0 9 4
A set_commands_to_run() 0 4 3
1
import os
2
from .filelock import open_with_lock
3
4
5
class CommandManager(object):
6
7
    def __init__(self, commands_filename):
8
        base_path, filename = os.path.split(commands_filename)
9
10
        self._running_commands_filename = os.path.join(base_path, "running_" + filename)
11
        self._finished_commands_filename = os.path.join(base_path, "finished_" + filename)
12
        self._failed_commands_filename = os.path.join(base_path, "failed_" + filename)
13
        self._commands_filename = commands_filename
14
15
    def _move_line_between_files(self, file1, file2, line):
16
        file1.seek(0, os.SEEK_SET)
17
        lines = file1.readlines()
18
        lines.remove(line)
19
20
        file1.seek(0, os.SEEK_SET)
21
        file1.writelines(lines)
22
        file1.truncate()
23
24
        file2.write(line)
25
26
    def set_commands_to_run(self, commands):
27
        with open_with_lock(self._commands_filename, 'a') as commands_file:
28
            commands = [command + '\n' for command in commands]
29
            commands_file.writelines(commands)
30
31
    def get_command_to_run(self):
32
        with open_with_lock(self._commands_filename, 'r+') as commands_file:
33
            with open_with_lock(self._running_commands_filename, 'a') as running_commands_file:
34
                command = commands_file.readline()
35
                if command == '':
36
                    return None
37
                self._move_line_between_files(commands_file, running_commands_file, command)
38
        return command[:-1]
39
40
    def get_nb_commands_to_run(self):
41
        with open(self._commands_filename, 'r') as commands_file:
42
            return len(commands_file.readlines())
43
44
    def get_failed_commands(self):
45
        commands = []
46
        if os.path.isfile(self._failed_commands_filename):
47
            with open(self._failed_commands_filename, 'r') as commands_file:
48
                commands = commands_file.readlines()
49
        return commands
50
51
    def set_running_command_as_finished(self, command, error_code=0):
52
        if error_code == 0:
53
            file_name = self._finished_commands_filename
54
        else:
55
            file_name = self._failed_commands_filename
56
57
        with open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
58
            with open_with_lock(file_name, 'a') as finished_commands_file:
59
                self._move_line_between_files(running_commands_file, finished_commands_file, command + '\n')
60
61
    def reset_running_commands(self):
62
        if os.path.isfile(self._running_commands_filename):
63
            with open_with_lock(self._commands_filename, 'r+') as commands_file:
64
                with open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
65
                    commands = running_commands_file.readlines()
66
                    if len(commands) > 0:
67
                        running_commands_file.seek(0, os.SEEK_SET)
68
                        running_commands_file.truncate()
69
70
                        commands += commands_file.readlines()
71
                        commands_file.seek(0, os.SEEK_SET)
72
                        commands_file.writelines(commands)
73