QueueManager   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 38
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 38
rs 10
wmc 6

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __len__() 0 2 1
A __init__() 0 7 2
A is_empty() 0 7 1
A push() 0 7 1
A pop() 0 7 1
1
""" Package defining Redis queue
2
3
.. Authors:
4
    Philippe Dessauw
5
    [email protected]
6
7
.. Sponsor:
8
    Alden Dima
9
    [email protected]
10
    Information Systems Group
11
    Software and Systems Division
12
    Information Technology Laboratory
13
    National Institute of Standards and Technology
14
    http://www.nist.gov/itl/ssd/is
15
"""
16
import json
17
import redis
18
from pipeline.commands import *
19
20
21
class QueueManager(object):
22
    """ Redis queue manager.
23
    """
24
25
    def __init__(self, host="127.0.0.1", port=6379, db=0, qname=None):
26
        self.server = redis.StrictRedis(host, port, db)
27
28
        if qname is not None:
29
            self.queue_name = qname
30
        else:
31
            self.queue_name = "default"
32
33
    def push(self, json_object):
34
        """Push JSON on a redis queue
35
36
        Parameters
37
            json_object (dict): JSON to push to redis
38
        """
39
        self.server.rpush(self.queue_name, json_object)
40
41
    def pop(self):
42
        """Pop object from a redis queue
43
44
        Returns
45
            dict: JSON from redis
46
        """
47
        return self.server.lpop(self.queue_name)
48
49
    def is_empty(self):
50
        """Test if the queue is empty or not
51
52
        Returns
53
            bool: True if empty, false otherwise
54
        """
55
        return len(self) == 0
56
57
    def __len__(self):
58
        return self.server.llen(self.queue_name)
59
60
61
class CommandQueueItem(object):
62
    """ Command stored in the redis queue.
63
    """
64
65
    def __init__(self, filename="", jsondata="", logger=None, config=None):
66
        if filename != "":
67
            self.current_step = 0
68
            self.filename = filename
69
            self.tries = 0
70
        else:  # Rebuild command from JSON
71
            data = json.loads(jsondata)
72
            self.current_step = data["command"]
73
            self.filename = data["filename"]
74
            self.tries = data["tries"]
75
76
        # self.filename = join(self.filename)
77
        self.logger = logger
78
        self.config = config
79
80
        # Building the command list
81
        self.steps = []
82
        for cmd in self.config["commands"]["list"]:
83
            cmd_class = None
84
            cmd_config = self.config
85
86
            if type(cmd) == str:
87
                cmd_class = eval(cmd)
88
            elif type(cmd) == dict:
89
                if len(cmd.keys()) == 1:
90
                    cmd_class = eval(cmd.keys()[0])
91
                    cmd_config["command"] = cmd.values()[0]
92
            if cmd_class is None:
93
                self.logger.fatal("Unreadable command list")
94
                raise SyntaxError(
95
                    "Command list is not correctly formatted"
96
                )
97
98
            self.steps.append(
99
                cmd_class(self.filename, self.logger, cmd_config)
100
            )
101
102
    def execute(self):
103
        """Execute the command
104
105
        Returns:
106
            int: 0 when no errors happen, >0 otherwise
107
        """
108
        command = self.steps[self.current_step]
109
        cmd_result = command.execute()
110
111
        if cmd_result == 1:  # The process has failed
112
            self.tries += 1
113
            return 1
114
115
        # Current step is incremented
116
        self.current_step += 1
117
118
        # Stop flag
119
        if self.current_step >= len(self.steps):
120
            self.current_step = -1
121
            self.tries = 0
122
123
        return 0
124
125
    def __str__(self):
126
        repr_str = {
127
            "command": self.current_step,
128
            "filename": self.filename,
129
            "tries": self.tries
130
        }
131
132
        return json.dumps(repr_str)
133
134
135
136