Completed
Push — master ( 9b5196...efb59b )
by Matthias
37s queued 13s
created

TestWorker.test_reconnect()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
import unittest
2
from mock import Mock
3
import random
4
import time
5
from kafka_influxdb.worker import Worker
6
from kafka_influxdb.encoder import echo_encoder
7
from kafka_influxdb.tests.helpers.timeout import timeout
8
from requests.exceptions import ConnectionError
9
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
10
11
12
class Config:
13
    """
14
    Dummy config with minimum settings to pass the tests
15
    """
16
    def __init__(self, buffer_size, buffer_timeout):
17
        self.buffer_size = buffer_size
18
        self.buffer_timeout = buffer_timeout
19
        self.kafka_topic = "test"
20
        self.influxdb_dbname = "mydb"
21
        self.statistics = False
22
23
24
class DummyReader(object):
25
    """
26
    A reader that yields dummy messages
27
    """
28
    def __init__(self, messages, num_messages):
29
        self.messages = messages
30
        self.num_messages = num_messages
31
32
    def read(self):
33
        for i in range(self.num_messages):
34
            yield random.choice(self.messages)
35
        # Simulate keyboard interrupt by user to stop consuming
36
        raise KeyboardInterrupt
37
38
class TimeoutReader(object):
39
    """
40
    A reader that writes half the messages then times out
41
    """
42
    def __init__(self, message, num_messages, timeout):
43
        self.message = message
44
        self.num_messages = num_messages
45
        self.timeout = timeout
46
47
    def read(self):
48
        for i in range(self.num_messages-1):
49
            yield self.message
50
        # Simulate no additional messages causing timeout
51
        time.sleep(self.timeout)
52
        yield False
53
        # Stop consuming
54
        raise SystemExit
55
56
class FlakyReader(object):
57
    """
58
    A fake reader that throws exceptions to simulate
59
    connection errors
60
    """
61
    def __init__(self, message, num_messages):
62
        self.message = message
63
        self.num_messages = num_messages
64
65
    def read(self):
66
        # Yield the first half of messages
67
        for i in range(int(self.num_messages/2)):
68
            yield self.message
69
        # Simulate a connection error while reading
70
        try:
71
            raise Exception
72
        except Exception:
73
            # Continue like you don't care.
74
            # Yield the second half of messages
75
            for i in range(int(self.num_messages/2)):
76
                yield self.message
77
        # Simulate keyboard interrupt by user to stop consuming
78
        raise KeyboardInterrupt
79
80
81
class TestWorker(unittest.TestCase):
82
    """
83
    Tests for message worker.
84
    """
85
    def setUp(self):
86
        self.config = Config(10, 0.1)
87
        self.encoder = echo_encoder.Encoder()
88
        self.writer = Mock()
89
        self.writer.write.return_value = True
90
91
    def test_create_database(self):
92
        """
93
        Retry creating the InfluxDB database in case of a connection error
94
        """
95
        reader = DummyReader(["bar"], self.config.buffer_size)
96
        writer_with_connection_error = Mock()
97
        writer_with_connection_error.create_database = Mock(side_effect=[ConnectionError, None])
98
99
        client = Worker(reader, self.encoder, writer_with_connection_error, self.config)
100
        client.consume()
101
        self.assertEqual(writer_with_connection_error.create_database.call_count, 2)
102
103
    def test_flush(self):
104
        """
105
        Messages should be flushed to the writer when the buffer is full.
106
        """
107
        reader = DummyReader(["bar"], self.config.buffer_size)
108
        client = Worker(reader, self.encoder, self.writer, self.config)
109
        client.consume()
110
        self.assertTrue(self.writer.write.called)
111
        self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size)
112
113
    def test_buffer_timeout(self):
114
        """
115
        If the buffer has timed out flush to the writer.
116
        """
117
        reader = TimeoutReader("bar", self.config.buffer_size, self.config.buffer_timeout)
118
        client = Worker(reader, self.encoder, self.writer, self.config)
119
        client.consume()
120
        self.assertTrue(self.writer.write.called)
121
        self.writer.write.assert_called_once_with(["bar"] * int(self.config.buffer_size-1))
122
123
    @timeout(0.1)
124
    def test_reconnect(self):
125
        """
126
        The worker should reconnect when the connection is interrupted.
127
        """
128
        reader = FlakyReader("baz", self.config.buffer_size)
129
        client = Worker(reader, self.encoder, self.writer, self.config)
130
        client.consume()
131
        self.assertTrue(self.writer.write.called)
132
        self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size)
133