Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

TestKafkaInfluxDB   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 27
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
c 5
b 0
f 0
dl 0
loc 27
rs 10
wmc 4

2 Methods

Rating   Name   Duplication   Size   Complexity  
A TestWorker.test_flush() 0 9 1
A TestWorker.setUp() 0 5 1
1
import unittest
2
from mock import Mock
3
import random
4
from kafka_influxdb.worker import Worker
5
from kafka_influxdb.encoder import echo_encoder
6
from kafka_influxdb.tests.helpers.timeout import timeout
7
from kafka.common import ConnectionError
8
9
10
class Config:
11
    """
12
    Dummy config with minimum settings to pass the tests
13
    """
14
    def __init__(self, buffer_size):
15
        self.buffer_size = buffer_size
16
        self.kafka_topic = "test"
17
        self.influxdb_dbname = "mydb"
18
        self.statistics = False
19
20
21
class DummyReader(object):
22
    """
23
    A reader that yields dummy messages
24
    """
25
    def __init__(self, messages, num_messages):
26
        self.messages = messages
27
        self.num_messages = num_messages
28
29
    def read(self):
30
        for i in range(self.num_messages):
31
            yield random.choice(self.messages)
32
        # Simulate keyboard interrupt by user to stop consuming
33
        raise KeyboardInterrupt
34
35
36
class FlakyReader(object):
37
    """
38
    A fake reader that throws exceptions to simulate
39
    connection errors
40
    """
41
    def __init__(self, message, num_messages):
42
        self.message = message
43
        self.num_messages = num_messages
44
45
    def read(self):
46
        # Yield the first half of messages
47
        for i in range(int(self.num_messages/2)):
48
            yield self.message
49
        # Simulate a connection error while reading
50
        try:
51
            raise Exception
52
        except Exception:
53
            # Continue like you don't care.
54
            # Yield the second half of messages
55
            for i in range(int(self.num_messages/2)):
56
                yield self.message
57
        # Simulate keyboard interrupt by user to stop consuming
58
        raise KeyboardInterrupt
59
60
61
class TestWorker(unittest.TestCase):
62
    """
63
    Tests for message worker.
64
    """
65
    def setUp(self):
66
        self.config = Config(10)
67
        self.encoder = echo_encoder.Encoder()
68
        self.writer = Mock()
69
        self.writer.write.return_value = True
70
71
    def test_flush(self):
72
        """
73
        Messages should be flushed to the writer when the buffer is full.
74
        """
75
        reader = DummyReader(["bar"], self.config.buffer_size)
76
        client = Worker(reader, self.encoder, self.writer, self.config)
77
        client.consume()
78
        self.assertTrue(self.writer.write.called)
79
        self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size)
80
81
    @timeout(0.1)
82
    def test_reconnect(self):
83
        """
84
        The worker should reconnect when the connection is interrupted.
85
        """
86
        reader = FlakyReader("baz", self.config.buffer_size)
87
        client = Worker(reader, self.encoder, self.writer, self.config)
88
        client.consume()
89
        self.assertTrue(self.writer.write.called)
90
        self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size)
91