Completed
Push — master ( dc30bb...497f6e )
by Philippe
35s
created

Master.run()   D

Complexity

Conditions 8

Size

Total Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 8
c 2
b 0
f 1
dl 0
loc 47
rs 4.3478
1
"""Package defining master and slave threads
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
import logging
17
from os import listdir, getpid, remove
18
from os.path import join, exists, split
19
from socket import gethostbyname, gethostname
20
from time import sleep
21
from shutil import move
22
from pipeline.files import FileManager
23
from pipeline.threads import StoppableThread
24
from pipeline.utils import create_data_directory
25
from pipeline.logger import AppLogger, LogWriter
26
from pipeline.queue import QueueManager, CommandQueueItem
27
28
29
class Master(StoppableThread):
30
    """ Master worker
31
    """
32
33
    def __init__(self, app_config):
34
        StoppableThread.__init__(self)
35
36
        # ip = app_config["machines"]["master"][0].split('@')
37
        # master_ip = ip[-1:][0]
38
        redis_ip = app_config["redis"]["host"]
39
        redis_port = app_config["redis"]["port"]
40
41
        self.logger = AppLogger("master", logging.getLogger("local"), redis_ip, redis_port)
42
        self.log_writer = LogWriter(logging.getLogger("app"), redis_ip, redis_port)
43
44
        self.command_queue = QueueManager(host=redis_ip, port=redis_port, qname="commands")
45
        self.finished_queue = QueueManager(host=redis_ip, port=redis_port, qname="finished")
46
        # self.fman = FileManager(master_ip, master_queue_port)
47
        self.fman = FileManager(app_config)
48
49
        self.config = app_config
50
        self.input = app_config["dirs"]["input"]
51
        self.output = app_config["dirs"]["output"]
52
53
    def run(self):
54
        self.log_writer.start()
55
        self.logger.info("Starting master...")
56
57
        # processed_filenames = []
58
59
        while not self.is_stopped():
60
            self.logger.info("Reading input directory...")
61
            # filenames = [f for f in listdir(self.input) if f not in processed_filenames]
62
            filenames = listdir(self.input)
63
64
            if len(filenames) > 0:
65
                self.logger.info(str(len(filenames)) + " file(s) to put in the queue")
66
67
                for filename in filenames:
68
                    self.logger.debug("Processing %s..." % filename)
69
                    full_filename = join(self.input, filename)
70
                    dirname = create_data_directory(full_filename, self.config["dirs"]["temp"])
71
                    self.logger.debug("%s has been created." % dirname)
72
73
                    if dirname is not None:
74
                        # archive = zip_directory(dirname)
75
76
                        # self.fman.store_file(archive)
77
                        self.command_queue.push(CommandQueueItem(filename=dirname, logger=self.logger,
78
                                                                 config=self.config))
79
80
                    # processed_filenames.append(filename)
81
                    self.logger.info("Incoming files have been put in the queue")
82
83
            if len(self.finished_queue) > 0:
84
                self.logger.info("Finished queue not empty")
85
86
                while not self.finished_queue.is_empty():
87
                    filename = self.finished_queue.pop()
88
                    # self.fman.retrieve_file(filename)
89
90
                    output_file_path = join(self.config["dirs"]["output"], split(filename)[1])
91
                    if exists(output_file_path):
92
                        remove(output_file_path)
93
94
                    move(filename, self.config["dirs"]["output"])
95
                    # self.fman.delete_file(filename)
96
97
                self.logger.info("No more finished job to process")
98
99
            sleep(60)  # Avoid CPU consuption while waiting
100
101
    def stop(self):
102
        self.logger.info("Master stopped")
103
104
        self.log_writer.stop()
105
        StoppableThread.stop(self)
106
107
108
class Slave(StoppableThread):
109
    """ Slave worker
110
    """
111
112
    def __init__(self, app_config):
113
        StoppableThread.__init__(self)
114
115
        self.config = app_config
116
117
        # ip = app_config["machines"]["master"][0].split('@')
118
        # master_ip = ip[-1:][0]
119
        redis_ip = app_config["redis"]["host"]
120
        redis_port = app_config["redis"]["port"]
121
122
        self.command_queue = QueueManager(host=redis_ip, port=redis_port, qname="commands")
123
        self.finished_queue = QueueManager(host=redis_ip, port=redis_port, qname="finished")
124
        # self.fman = FileManager(master_ip, master_queue_port)
125
126
        slave_ip = gethostbyname(gethostname())
127
        slave_pid = getpid()
128
        uid = slave_ip + "::" + str(slave_pid)
129
130
        self.logger = AppLogger(uid, logging.getLogger("local"), redis_ip, redis_port)
131
        self.max_tries = app_config["commands"]["tries"]
132
133
        self.logger.info("Slave initiated [redis on "+redis_ip+"]")
134
135
    def run(self):
136
        self.logger.info("Starting slave...")
137
138
        while not self.is_stopped():
139
            if not self.command_queue.is_empty():
140
                cmd_json = self.command_queue.pop()
141
                cmd = CommandQueueItem(jsondata=cmd_json, logger=self.logger, config=self.config)
142
143
                status = cmd.execute()
144
145
                # Job returned an error and has reached the limit of tries
146
                if status == 1 and cmd.tries >= self.max_tries:
147
                    self.logger.error("Error when processing command")
148
                    continue
149
150
                if cmd.current_step == -1:
151
                    self.logger.info("Pushing to finished queue")
152
                    self.finished_queue.push(cmd.filename)
153
                    self.logger.info("Job done")
154
                    continue
155
156
                self.command_queue.push(cmd)
157
158
            sleep(1)  # Avoid CPU consumption while waiting
159
160
    def stop(self):
161
        self.logger.info("Slave stopped")
162
        StoppableThread.stop(self)
163