Completed
Pull Request — master (#49)
by
unknown
01:11
created

TimeoutReader.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 4
rs 10
1
import unittest
2
from mock import Mock
3
import random
4
import time
5
from kafka_influxdb.worker import Worker
6
from kafka_influxdb.encoder import echo_encoder
7
from kafka_influxdb.tests.helpers.timeout import timeout
8
from kafka.common import ConnectionError
9
10
11
class Config:
12
    """
13
    Dummy config with minimum settings to pass the tests
14
    """
15
    def __init__(self, buffer_size, buffer_timeout):
16
        self.buffer_size = buffer_size
17
        self.buffer_timeout = buffer_timeout
18
        self.kafka_topic = "test"
19
        self.influxdb_dbname = "mydb"
20
        self.statistics = False
21
22
23
class DummyReader(object):
24
    """
25
    A reader that yields dummy messages
26
    """
27
    def __init__(self, messages, num_messages):
28
        self.messages = messages
29
        self.num_messages = num_messages
30
31
    def read(self):
32
        for i in range(self.num_messages):
33
            yield random.choice(self.messages)
34
        # Simulate keyboard interrupt by user to stop consuming
35
        raise KeyboardInterrupt
36
37
class TimeoutReader(object):
38
    """
39
    A reader that writes half the messages then times out
40
    """
41
    def __init__(self, message, num_messages, timeout):
42
        self.message = message
43
        self.num_messages = num_messages
44
        self.timeout = timeout
45
46
    def read(self):
47
        # Yield the first half of messages
48
        for i in range(self.num_messages-1):
49
            yield self.message
50
        # Simulate no additional messages causing timeout
51
        time.sleep(self.timeout)
52
        yield False
53
        # Simulate keyboard interrupt by user to stop consuming
54
        raise KeyboardInterrupt
55
56
class FlakyReader(object):
57
    """
58
    A fake reader that throws exceptions to simulate
59
    connection errors
60
    """
61
    def __init__(self, message, num_messages):
62
        self.message = message
63
        self.num_messages = num_messages
64
65
    def read(self):
66
        # Yield the first half of messages
67
        for i in range(int(self.num_messages/2)):
68
            yield self.message
69
        # Simulate a connection error while reading
70
        try:
71
            raise Exception
72
        except Exception:
73
            # Continue like you don't care.
74
            # Yield the second half of messages
75
            for i in range(int(self.num_messages/2)):
76
                yield self.message
77
        # Simulate keyboard interrupt by user to stop consuming
78
        raise KeyboardInterrupt
79
80
81
class TestWorker(unittest.TestCase):
82
    """
83
    Tests for message worker.
84
    """
85
    def setUp(self):
86
        self.config = Config(10, 0.1)
87
        self.encoder = echo_encoder.Encoder()
88
        self.writer = Mock()
89
        self.writer.write.return_value = True
90
91
    def test_flush(self):
92
        """
93
        Messages should be flushed to the writer when the buffer is full.
94
        """
95
        reader = DummyReader(["bar"], self.config.buffer_size)
96
        client = Worker(reader, self.encoder, self.writer, self.config)
97
        client.consume()
98
        self.assertTrue(self.writer.write.called)
99
        self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size)
100
101
    def test_buffer_timeout(self):
102
        """
103
        If the buffer has timed out flush to the writer.
104
        """
105
        reader = TimeoutReader("bar", self.config.buffer_size, self.config.buffer_timeout)
106
        client = Worker(reader, self.encoder, self.writer, self.config)
107
        client.consume()
108
        self.assertTrue(self.writer.write.called)
109
        self.writer.write.assert_called_once_with(["bar"] * int(self.config.buffer_size-1))
110
111
    @timeout(0.1)
112
    def test_reconnect(self):
113
        """
114
        The worker should reconnect when the connection is interrupted.
115
        """
116
        reader = FlakyReader("baz", self.config.buffer_size)
117
        client = Worker(reader, self.encoder, self.writer, self.config)
118
        client.consume()
119
        self.assertTrue(self.writer.write.called)
120
        self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size)
121