TestKafkaPython.receive_messages()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
import unittest
2
import mock
3
from kafka_influxdb.reader import kafka_python
4
from kafka.common import Message
5
6
7
class TestKafkaPython(unittest.TestCase):
8
    def setUp(self):
9
        self.host = "myhost"
10
        self.port = 1234
11
        self.group = "mygroup"
12
        self.topic = "mytopic"
13
        self.offset = "largest"
14
        self.reconnect_wait_time = 0.01
15
        self.reader = self.create_reader()
16
17
    def create_reader(self):
18
        reader = kafka_python.Reader(self.host,
19
                                     self.port,
20
                                     self.group,
21
                                     self.topic,
22
                                     self.offset)
23
        reader.consumer = mock.MagicMock()
24
        return reader
25
26
    def sample_messages(self, payload, count):
27
        return count * [Message(0, 0, None, payload)], count * [payload]
28
29
    def test_handle_read(self):
30
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
31
        self.reader.consumer.__iter__.return_value = sample_messages
32
        self.reader._connect = mock.MagicMock()
33
        received_messages = list(self.reader._handle_read())
34
        self.assertEqual(received_messages, extracted_messages)
35
36
    def receive_messages(self):
37
        for message in self.reader.read():
38
            yield message
39