Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

TestKafkaPython.create_reader()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
1
import unittest
2
import mock
3
from kafka_influxdb.reader import kafka_python
4
from kafka.common import Message
5
6
7
class TestKafkaPython(unittest.TestCase):
8
    def setUp(self):
9
        self.host = "myhost"
10
        self.port = 1234
11
        self.group = "mygroup"
12
        self.topic = "mytopic"
13
        self.reconnect_wait_time = 0.01
14
        self.reader = self.create_reader()
15
16
    def create_reader(self):
17
        reader = kafka_python.Reader(self.host,
18
                                     self.port,
19
                                     self.group,
20
                                     self.topic)
21
        reader.consumer = mock.MagicMock()
22
        return reader
23
24
    def sample_messages(self, payload, count):
25
        return count * [Message(0, 0, None, payload)], count * [payload]
26
27
    def test_handle_read(self):
28
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
29
        self.reader.consumer.__iter__.return_value = sample_messages
30
        self.reader._connect = mock.MagicMock()
31
        received_messages = list(self.reader._handle_read())
32
        self.assertEqual(received_messages, extracted_messages)
33
34
    def receive_messages(self):
35
        for message in self.reader.read():
36
            yield message
37