Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

TestKafkaReader.sample_messages()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 0
f 0
1
import unittest
2
import mock
3
from kafka_influxdb.reader import kafka_reader
4
from kafka.common import ConnectionError
5
from kafka.common import Message
6
from kafka_influxdb.tests.helpers.timeout import timeout
7
8
9
class TestKafkaReader(unittest.TestCase):
10
    def setUp(self):
11
        self.host = "myhost"
12
        self.port = 1234
13
        self.group = "mygroup"
14
        self.topic = "mytopic"
15
        self.reconnect_wait_time = 0.01
16
17
        self.reader = kafka_reader.KafkaReader(self.host,
18
                                               self.port,
19
                                               self.group,
20
                                               self.topic,
21
                                               self.reconnect_wait_time)
22
        self.reader.consumer = mock.MagicMock()
23
24
    def sample_messages(self, payload, count):
25
        return count * [Message(0, 0, None, payload)], count * [payload]
26
27
    def test_handle_read(self):
28
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
29
        self.reader.consumer.__iter__.return_value = sample_messages
30
        self.reader._connect = mock.MagicMock()
31
        received_messages = list(self.reader._handle_read())
32
        self.assertEqual(received_messages, extracted_messages)
33
34
    @timeout(0.1)
35
    def test_reconnect(self):
36
        """
37
        In case of a connection error, the client should reconnect and
38
        start receiving messages again without interruption
39
        """
40
        sample_messages1, extracted_messages1 = self.sample_messages("hi", 3)
41
        sample_messages2, extracted_messages2 = self.sample_messages("world", 3)
42
        sample_messages = sample_messages1 + [ConnectionError] + sample_messages2
43
        self.reader.consumer.__iter__.return_value = sample_messages
44
        received_messages = list(self.receive_messages())
45
        self.assertEqual(received_messages, extracted_messages1 + extracted_messages2)
46
47
    def receive_messages(self):
48
        for message in self.reader.read():
49
            yield message
50