TimeoutReader.read()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 8
rs 9.4285
1
import unittest
2
from mock import Mock
3
import random
4
import time
5
from kafka_influxdb.worker import Worker
6
from kafka_influxdb.encoder import echo_encoder
7
from kafka_influxdb.tests.helpers.timeout import timeout
8
from requests.exceptions import ConnectionError
9
from mock import patch
10
11
12
class Config:
13
    """
14
    Dummy config with minimum settings to pass the tests
15
    """
16
17
    def __init__(self, buffer_size, buffer_timeout):
18
        self.buffer_size = buffer_size
19
        self.buffer_timeout = buffer_timeout
20
        self.kafka_topic = "test"
21
        self.influxdb_dbname = "mydb"
22
        self.statistics = False
23
24
25
class DummyReader(object):
26
    """
27
    A reader that yields dummy messages
28
    """
29
30
    def __init__(self, messages, num_messages):
31
        self.messages = messages
32
        self.num_messages = num_messages
33
34
    def read(self):
35
        for i in range(self.num_messages):
36
            yield random.choice(self.messages)
37
        # Simulate keyboard interrupt by user to stop consuming
38
        raise KeyboardInterrupt
39
40
41
class TimeoutReader(object):
42
    """
43
    A reader that writes half the messages then times out
44
    """
45
46
    def __init__(self, message, num_messages, timeout):
47
        self.message = message
48
        self.num_messages = num_messages
49
        self.timeout = timeout
50
51
    def read(self):
52
        for i in range(self.num_messages-1):
53
            yield self.message
54
        # Simulate no additional messages causing timeout
55
        time.sleep(self.timeout)
56
        yield False
57
        # Stop consuming
58
        raise SystemExit
59
60
61
class FlakyReader(object):
62
    """
63
    A fake reader that throws exceptions to simulate
64
    connection errors
65
    """
66
67
    def __init__(self, message, num_messages):
68
        self.message = message
69
        self.num_messages = num_messages
70
71
    def read(self):
72
        # Yield the first half of messages
73
        for i in range(int(self.num_messages/2)):
74
            yield self.message
75
        # Simulate a connection error while reading
76
        try:
77
            raise Exception
78
        except Exception:
79
            # Continue like you don't care.
80
            # Yield the second half of messages
81
            for i in range(int(self.num_messages/2)):
82
                yield self.message
83
        # Simulate keyboard interrupt by user to stop consuming
84
        raise KeyboardInterrupt
85
86
87
class TestWorker(unittest.TestCase):
88
    """
89
    Tests for message worker.
90
    """
91
92
    def setUp(self):
93
        self.config = Config(10, 0.1)
94
        self.encoder = echo_encoder.Encoder()
95
        self.writer = Mock()
96
        self.writer.write.return_value = True
97
98
    @patch('time.sleep', return_value=None)
99
    def test_create_database(self, _):
100
        """
101
        Retry creating the InfluxDB database in case of a connection error
102
        """
103
        reader = DummyReader(["bar"], self.config.buffer_size)
104
        writer_with_connection_error = Mock()
105
        writer_with_connection_error.create_database = Mock(
106
            side_effect=[ConnectionError, None])
107
108
        client = Worker(reader, self.encoder,
109
                        writer_with_connection_error, self.config)
110
        client.consume()
111
        self.assertEqual(
112
            writer_with_connection_error.create_database.call_count, 2)
113
114
    def test_flush(self):
115
        """
116
        Messages should be flushed to the writer when the buffer is full.
117
        """
118
        reader = DummyReader(["bar"], self.config.buffer_size)
119
        client = Worker(reader, self.encoder, self.writer, self.config)
120
        client.consume()
121
        self.assertTrue(self.writer.write.called)
122
        self.writer.write.assert_called_once_with(
123
            ["bar"] * self.config.buffer_size)
124
125
    def test_buffer_timeout(self):
126
        """
127
        If the buffer has timed out flush to the writer.
128
        """
129
        reader = TimeoutReader(
130
            "bar", self.config.buffer_size, self.config.buffer_timeout)
131
        client = Worker(reader, self.encoder, self.writer, self.config)
132
        client.consume()
133
        self.assertTrue(self.writer.write.called)
134
        self.writer.write.assert_called_once_with(
135
            ["bar"] * int(self.config.buffer_size-1))
136
137
    @timeout(0.1)
138
    def test_reconnect(self):
139
        """
140
        The worker should reconnect when the connection is interrupted.
141
        """
142
        reader = FlakyReader("baz", self.config.buffer_size)
143
        client = Worker(reader, self.encoder, self.writer, self.config)
144
        client.consume()
145
        self.assertTrue(self.writer.write.called)
146
        self.writer.write.assert_called_once_with(
147
            ["baz"] * self.config.buffer_size)
148