Master   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 77
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 77
rs 10
wmc 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 19 1
A stop() 0 5 1
D run() 0 47 8
1
"""Package defining master and slave threads
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
import logging
17
from os import listdir, getpid, remove
18
from os.path import join, exists, split
19
from socket import gethostbyname, gethostname
20
from time import sleep
21
from shutil import move
22
from pipeline.files import FileManager
23
from pipeline.threads import StoppableThread
24
from pipeline.utils import create_data_directory
25
from pipeline.logger import AppLogger, LogWriter
26
from pipeline.queue import QueueManager, CommandQueueItem
27
28
29
class Master(StoppableThread):
30
    """ Master worker
31
    """
32
33
    def __init__(self, app_config):
34
        StoppableThread.__init__(self)
35
36
        # ip = app_config["machines"]["master"][0].split('@')
37
        # master_ip = ip[-1:][0]
38
        redis_ip = app_config["redis"]["host"]
39
        redis_port = app_config["redis"]["port"]
40
41
        self.logger = AppLogger("master", logging.getLogger("local"), redis_ip, redis_port)
42
        self.log_writer = LogWriter(logging.getLogger("app"), redis_ip, redis_port)
43
44
        self.command_queue = QueueManager(host=redis_ip, port=redis_port, qname="commands")
45
        self.finished_queue = QueueManager(host=redis_ip, port=redis_port, qname="finished")
46
        # self.fman = FileManager(master_ip, master_queue_port)
47
        self.fman = FileManager(app_config)
48
49
        self.config = app_config
50
        self.input = app_config["dirs"]["input"]
51
        self.output = app_config["dirs"]["output"]
52
53
    def run(self):
54
        self.log_writer.start()
55
        self.logger.info("Starting master...")
56
57
        # processed_filenames = []
58
59
        while not self.is_stopped():
60
            self.logger.info("Reading input directory...")
61
            # filenames = [f for f in listdir(self.input) if f not in processed_filenames]
62
            filenames = listdir(self.input)
63
64
            if len(filenames) > 0:
65
                self.logger.info(str(len(filenames)) + " file(s) to put in the queue")
66
67
                for filename in filenames:
68
                    self.logger.debug("Processing %s..." % filename)
69
                    full_filename = join(self.input, filename)
70
                    dirname = create_data_directory(full_filename, self.config["dirs"]["temp"])
71
                    self.logger.debug("%s has been created." % dirname)
72
73
                    if dirname is not None:
74
                        # archive = zip_directory(dirname)
75
76
                        # self.fman.store_file(archive)
77
                        self.command_queue.push(CommandQueueItem(filename=dirname, logger=self.logger,
78
                                                                 config=self.config))
79
80
                    # processed_filenames.append(filename)
81
                    self.logger.info("Incoming files have been put in the queue")
82
83
            if len(self.finished_queue) > 0:
84
                self.logger.info("Finished queue not empty")
85
86
                while not self.finished_queue.is_empty():
87
                    filename = self.finished_queue.pop()
88
                    # self.fman.retrieve_file(filename)
89
90
                    output_file_path = join(self.config["dirs"]["output"], split(filename)[1])
91
                    if exists(output_file_path):
92
                        remove(output_file_path)
93
94
                    move(filename, self.config["dirs"]["output"])
95
                    # self.fman.delete_file(filename)
96
97
                self.logger.info("No more finished job to process")
98
99
            sleep(self.config["sleep"]["master"])  # Avoid CPU consuption while waiting
100
101
    def stop(self):
102
        self.logger.info("Master stopped")
103
104
        self.log_writer.stop()
105
        StoppableThread.stop(self)
106
107
108
class Slave(StoppableThread):
109
    """ Slave worker
110
    """
111
112
    def __init__(self, app_config):
113
        StoppableThread.__init__(self)
114
115
        self.config = app_config
116
117
        # ip = app_config["machines"]["master"][0].split('@')
118
        # master_ip = ip[-1:][0]
119
        redis_ip = app_config["redis"]["host"]
120
        redis_port = app_config["redis"]["port"]
121
122
        self.command_queue = QueueManager(host=redis_ip, port=redis_port, qname="commands")
123
        self.finished_queue = QueueManager(host=redis_ip, port=redis_port, qname="finished")
124
        # self.fman = FileManager(master_ip, master_queue_port)
125
126
        slave_ip = gethostbyname(gethostname())
127
        slave_pid = getpid()
128
        uid = slave_ip + "::" + str(slave_pid)
129
130
        self.logger = AppLogger(uid, logging.getLogger("local"), redis_ip, redis_port)
131
        self.max_tries = app_config["commands"]["tries"]
132
133
        self.logger.info("Slave initiated [redis on "+redis_ip+"]")
134
135
    def run(self):
136
        self.logger.info("Starting slave...")
137
138
        while not self.is_stopped():
139
            if not self.command_queue.is_empty():
140
                cmd_json = self.command_queue.pop()
141
                self.logger.debug("CommandQueueItem(jsondata=%s, ...)" % str(cmd_json))
142
                cmd = CommandQueueItem(jsondata=cmd_json, logger=self.logger, config=self.config)
143
144
                # Start the job after waiting sync between master and worker
145
                sleep(self.config["sleep"]["job"])
146
                status = cmd.execute()
147
148
                # Job returned an error and has reached the limit of tries
149
                if status == 1 and cmd.tries >= self.max_tries:
150
                    self.logger.error("Error when processing command")
151
                    continue
152
153
                if cmd.current_step == -1:
154
                    self.logger.info("Pushing to finished queue")
155
                    self.finished_queue.push(cmd.filename)
156
                    self.logger.info("Job done")
157
                    continue
158
159
                self.command_queue.push(cmd)
160
161
            sleep(self.config["sleep"]["worker"])  # Avoid CPU consumption while waiting
162
163
    def stop(self):
164
        self.logger.info("Slave stopped")
165
        StoppableThread.stop(self)
166