Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.tests.FlakyReader   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 21
Duplicated Lines 0 %
Metric Value
dl 0
loc 21
rs 10
wmc 5

2 Methods

Rating   Name   Duplication   Size   Complexity  
A FlakyReader.read() 0 12 4
A FlakyReader.__init__() 0 3 1
1
import unittest
2
from mock import Mock
3
import random
4
from kafka_influxdb.worker import Worker
5
from kafka_influxdb.encoder import echo_encoder
6
7
8
class Config:
9
    def __init__(self, buffer_size):
10
        self.buffer_size = buffer_size
11
        self.kafka_topic = "test"
12
        self.influxdb_dbname = "mydb"
13
14
15
class DummyReader(object):
16
    """
17
    A reader that yields dummy messages
18
    """
19
    def __init__(self, messages, num_messages):
20
        self.messages = messages
21
        self.num_messages = num_messages
22
23
    def read(self):
24
        for i in range(self.num_messages):
25
            yield random.choice(self.messages)
26
27
28
class FlakyReader(object):
29
    """
30
    A fake reader that throws exceptions to simulate
31
    connection errors
32
    """
33
    def __init__(self, message, num_messages):
34
        self.message = message
35
        self.num_messages = num_messages
36
37
    def read(self):
38
        # Yield the first half of messages
39
        for i in range(int(self.num_messages/2)):
40
            yield self.message
41
        # Simulate a connection error while reading
42
        try:
43
            raise Exception
44
        except Exception:
45
            # Continue like you don't care.
46
            # Yield the second half of messages
47
            for i in range(int(self.num_messages/2)):
48
                yield self.message
49
50
class DummyWriter(object):
51
    """
52
    A fake writer that does nothing with the input data
53
    """
54
    def __init__(self):
55
        pass
56
57
    def write(self):
58
        pass
59
60
61
class TestKafkaInfluxDB(unittest.TestCase):
62
    def setUp(self):
63
        self.config = Config(100)
64
        self.encoder = echo_encoder.Encoder()
65
        self.writer = DummyWriter()
66
        self.writer = Mock()
67
        self.writer.write.return_value = True
68
69
    def test_buffering(self):
70
        self.reader = DummyReader(["foo"], self.config.buffer_size - 1)
71
        self.client = Worker(self.reader, self.encoder, self.writer, self.config)
72
        self.client.consume()
73
        self.assertFalse(self.writer.write.called)
74
75
    def test_flush(self):
76
        self.reader = DummyReader(["bar"], self.config.buffer_size)
77
        self.client = Worker(self.reader, self.encoder, self.writer, self.config)
78
        self.client.consume()
79
        self.assertTrue(self.writer.write.called)
80
81
    def test_reconnect(self):
82
        self.reader = FlakyReader(["baz"], self.config.buffer_size)
83
        self.client = Worker(self.reader, self.encoder, self.writer, self.config)
84
        self.client.consume()
85
        self.assertTrue(self.writer.write.called)
86
        self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size)
87