Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

TestKafkaReader   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 41
Duplicated Lines 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
dl 0
loc 41
rs 10
c 4
b 0
f 0
wmc 6

5 Methods

Rating   Name   Duplication   Size   Complexity  
A setUp() 0 13 1
A sample_messages() 0 2 1
A test_handle_read() 0 6 1
A test_reconnect() 0 12 1
A receive_messages() 0 3 2
1
import unittest
2
import mock
3
from kafka_influxdb.reader import kafka_reader
4
from kafka.common import ConnectionError
5
from kafka.common import Message
6
from kafka_influxdb.tests.helpers.timeout import timeout
7
8
9
class TestKafkaReader(unittest.TestCase):
10
    def setUp(self):
11
        self.host = "myhost"
12
        self.port = 1234
13
        self.group = "mygroup"
14
        self.topic = "mytopic"
15
        self.reconnect_wait_time = 0.01
16
17
        self.reader = kafka_reader.KafkaReader(self.host,
18
                                               self.port,
19
                                               self.group,
20
                                               self.topic,
21
                                               self.reconnect_wait_time)
22
        self.reader.consumer = mock.MagicMock()
23
24
    def sample_messages(self, payload, count):
25
        return count * [Message(0, 0, None, payload)], count * [payload]
26
27
    def test_handle_read(self):
28
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
29
        self.reader.consumer.__iter__.return_value = sample_messages
30
        self.reader._connect = mock.MagicMock()
31
        received_messages = list(self.reader._handle_read())
32
        self.assertEqual(received_messages, extracted_messages)
33
34
    @timeout(0.1)
35
    def test_reconnect(self):
36
        """
37
        In case of a connection error, the client should reconnect and
38
        start receiving messages again without interruption
39
        """
40
        sample_messages1, extracted_messages1 = self.sample_messages("hi", 3)
41
        sample_messages2, extracted_messages2 = self.sample_messages("world", 3)
42
        sample_messages = sample_messages1 + [ConnectionError] + sample_messages2
43
        self.reader.consumer.__iter__.return_value = sample_messages
44
        received_messages = list(self.receive_messages())
45
        self.assertEqual(received_messages, extracted_messages1 + extracted_messages2)
46
47
    def receive_messages(self):
48
        for message in self.reader.read():
49
            yield message
50