Completed
Push — master ( 42c846...16c1df )
by Matthias
01:08
created

TimeoutReader.read()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 8
rs 9.4285
1
import unittest
2
from mock import Mock
3
import random
4
import time
5
from kafka_influxdb.worker import Worker
6
from kafka_influxdb.encoder import echo_encoder
7
from kafka_influxdb.tests.helpers.timeout import timeout
8
from kafka.common import ConnectionError
9
10
11
class Config:
12
    """
13
    Dummy config with minimum settings to pass the tests
14
    """
15
    def __init__(self, buffer_size, buffer_timeout):
16
        self.buffer_size = buffer_size
17
        self.buffer_timeout = buffer_timeout
18
        self.kafka_topic = "test"
19
        self.influxdb_dbname = "mydb"
20
        self.statistics = False
21
22
23
class DummyReader(object):
24
    """
25
    A reader that yields dummy messages
26
    """
27
    def __init__(self, messages, num_messages):
28
        self.messages = messages
29
        self.num_messages = num_messages
30
31
    def read(self):
32
        for i in range(self.num_messages):
33
            yield random.choice(self.messages)
34
        # Simulate keyboard interrupt by user to stop consuming
35
        raise KeyboardInterrupt
36
37
class TimeoutReader(object):
38
    """
39
    A reader that writes half the messages then times out
40
    """
41
    def __init__(self, message, num_messages, timeout):
42
        self.message = message
43
        self.num_messages = num_messages
44
        self.timeout = timeout
45
46
    def read(self):
47
        for i in range(self.num_messages-1):
48
            yield self.message
49
        # Simulate no additional messages causing timeout
50
        time.sleep(self.timeout)
51
        yield False
52
        # Stop consuming
53
        raise SystemExit
54
55
class FlakyReader(object):
56
    """
57
    A fake reader that throws exceptions to simulate
58
    connection errors
59
    """
60
    def __init__(self, message, num_messages):
61
        self.message = message
62
        self.num_messages = num_messages
63
64
    def read(self):
65
        # Yield the first half of messages
66
        for i in range(int(self.num_messages/2)):
67
            yield self.message
68
        # Simulate a connection error while reading
69
        try:
70
            raise Exception
71
        except Exception:
72
            # Continue like you don't care.
73
            # Yield the second half of messages
74
            for i in range(int(self.num_messages/2)):
75
                yield self.message
76
        # Simulate keyboard interrupt by user to stop consuming
77
        raise KeyboardInterrupt
78
79
80
class TestWorker(unittest.TestCase):
81
    """
82
    Tests for message worker.
83
    """
84
    def setUp(self):
85
        self.config = Config(10, 0.1)
86
        self.encoder = echo_encoder.Encoder()
87
        self.writer = Mock()
88
        self.writer.write.return_value = True
89
90
    def test_flush(self):
91
        """
92
        Messages should be flushed to the writer when the buffer is full.
93
        """
94
        reader = DummyReader(["bar"], self.config.buffer_size)
95
        client = Worker(reader, self.encoder, self.writer, self.config)
96
        client.consume()
97
        self.assertTrue(self.writer.write.called)
98
        self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size)
99
100
    def test_buffer_timeout(self):
101
        """
102
        If the buffer has timed out flush to the writer.
103
        """
104
        reader = TimeoutReader("bar", self.config.buffer_size, self.config.buffer_timeout)
105
        client = Worker(reader, self.encoder, self.writer, self.config)
106
        client.consume()
107
        self.assertTrue(self.writer.write.called)
108
        self.writer.write.assert_called_once_with(["bar"] * int(self.config.buffer_size-1))
109
110
    @timeout(0.1)
111
    def test_reconnect(self):
112
        """
113
        The worker should reconnect when the connection is interrupted.
114
        """
115
        reader = FlakyReader("baz", self.config.buffer_size)
116
        client = Worker(reader, self.encoder, self.writer, self.config)
117
        client.consume()
118
        self.assertTrue(self.writer.write.called)
119
        self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size)
120