Completed
Branch master (e214b7)
by Philippe
36s
created

Slave   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 54
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 54
rs 10
wmc 8

3 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 3 1
B run() 0 24 6
A __init__() 0 21 1
1
"""Package defining master and slave threads
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
import logging
17
from os import listdir, getpid, remove
18
from os.path import join, exists, split
19
from socket import gethostbyname, gethostname
20
from time import sleep
21
from shutil import move
22
from apputils.fileop import zip_directory
23
from pipeline.files import FileManager
24
from pipeline.threads import StoppableThread
25
from pipeline.utils import create_data_directory
26
from pipeline.logger import AppLogger, LogWriter
27
from pipeline.queue import QueueManager, CommandQueueItem
28
29
30
class Master(StoppableThread):
31
    """Master worker
32
    """
33
34
    def __init__(self, app_config):
35
        StoppableThread.__init__(self)
36
37
        ip = app_config["machines"]["master"][0].split('@')
38
        master_ip = ip[-1:][0]
39
        master_queue_port = app_config["redis"]["port"]
40
41
        self.logger = AppLogger("master", logging.getLogger("local"), master_ip, master_queue_port)
42
        self.log_writer = LogWriter(logging.getLogger("app"))
43
44
        self.command_queue = QueueManager(host=master_ip, port=master_queue_port, qname="commands")
45
        self.finished_queue = QueueManager(host=master_ip, port=master_queue_port, qname="finished")
46
        self.fman = FileManager(master_ip, master_queue_port)
47
48
        self.config = app_config
49
        self.input = app_config["dirs"]["input"]
50
        self.output = app_config["dirs"]["output"]
51
52
    def run(self):
53
        self.log_writer.start()
54
        self.logger.debug("Running master...")
55
56
        processed_filenames = []
57
58
        while not self.is_stopped():
59
            self.logger.debug("Reading directory...")
60
            filenames = [f for f in listdir(self.input) if f not in processed_filenames]
61
62
            if len(filenames) > 0:
63
                self.logger.info(str(len(filenames)) + " file(s) to put in the queue")
64
65
            for filename in filenames:
66
                full_filename = join(self.input, filename)
67
                dirname = create_data_directory(full_filename)
68
69
                if dirname is not None:
70
                    archive = zip_directory(dirname)
71
72
                    self.fman.store_file(archive)
73
                    self.command_queue.push(CommandQueueItem(filename=archive, logger=self.logger, config=self.config))
74
75
                processed_filenames.append(filename)
76
77
            if len(self.finished_queue) > 0:
78
                self.logger.info("Finished queue not empty")
79
80
                while not self.finished_queue.is_empty():
81
                    filename = self.finished_queue.pop()
82
                    self.fman.retrieve_file(filename)
83
84
                    output_file_path = join(self.config["dirs"]["output"], split(filename)[1])
85
                    if exists(output_file_path):
86
                        remove(output_file_path)
87
88
                    move(filename, self.config["dirs"]["output"])
89
                    self.fman.delete_file(filename)
90
91
                self.logger.info("No more finished job to process")
92
93
            sleep(60)  # Avoid CPU consuption while waiting
94
95
    def stop(self):
96
        self.logger.info("Master stopped")
97
98
        self.log_writer.stop()
99
        StoppableThread.stop(self)
100
101
102
class Slave(StoppableThread):
103
    """Slave worker
104
    """
105
106
    def __init__(self, app_config):
107
        StoppableThread.__init__(self)
108
109
        self.config = app_config
110
111
        ip = app_config["machines"]["master"][0].split('@')
112
        master_ip = ip[-1:][0]
113
        master_queue_port = app_config["redis"]["port"]
114
115
        self.command_queue = QueueManager(host=master_ip, port=master_queue_port, qname="commands")
116
        self.finished_queue = QueueManager(host=master_ip, port=master_queue_port, qname="finished")
117
        self.fman = FileManager(master_ip, master_queue_port)
118
119
        slave_ip = gethostbyname(gethostname())
120
        slave_pid = getpid()
121
        uid = slave_ip + "::" + str(slave_pid)
122
123
        self.logger = AppLogger(uid, logging.getLogger("local"), master_ip, master_queue_port)
124
        self.max_tries = app_config["commands"]["tries"]
125
126
        self.logger.info("Slave initiated [redis on "+master_ip+"]")
127
128
    def run(self):
129
        self.logger.debug("Running slave...")
130
131
        while not self.is_stopped():
132
            if not self.command_queue.is_empty():
133
                cmd_json = self.command_queue.pop()
134
                cmd = CommandQueueItem(jsondata=cmd_json, logger=self.logger, config=self.config)
135
136
                status = cmd.execute()
137
138
                # Job returned an error and has reached the limit of tries
139
                if status == 1 and cmd.tries >= self.max_tries:
140
                    self.logger.error("Error when processing command")
141
                    continue
142
143
                if cmd.current_step == -1:
144
                    self.logger.info("Pushing to finished queue")
145
                    self.finished_queue.push(cmd.filename)
146
                    self.logger.info("Job done")
147
                    continue
148
149
                self.command_queue.push(cmd)
150
151
            sleep(1)  # Avoid CPU consumption while waiting
152
153
    def stop(self):
154
        self.logger.info("Slave stopped")
155
        StoppableThread.stop(self)
156