Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

TestConfluentKafka.create_kafka_message()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
1
import unittest
2
import mock
3
import pytest
4
5
confluent = pytest.importorskip('kafka_influxdb.reader.confluent')
6
7
8
class KafkaError(object):
9
    def __init__(self):
10
        """
11
        Error types raised by confluent kafka
12
        """
13
        self._PARTITION_EOF = 1
14
15
16
class TestConfluentKafka(unittest.TestCase):
17
    def setUp(self):
18
        self.host = "myhost"
19
        self.port = 1234
20
        self.group = "mygroup"
21
        self.topic = "mytopic"
22
        self.reconnect_wait_time = 0.01
23
        self.reader = self.create_reader()
24
25
    def create_reader(self):
26
        reader = confluent.Reader(self.host,
27
                                  self.port,
28
                                  self.group,
29
                                  self.topic)
30
        reader.consumer = mock.MagicMock()
31
        return reader
32
33
    @staticmethod
34
    def create_kafka_message(key, value, kafka_error_type=None):
35
        message = mock.MagicMock()
36
        message.return_value = True
37
        message.key.return_value = key
38
        message.value.return_value = value
39
        if kafka_error_type:
40
            message.error.return_value = True
41
            message.error.code.return_value = kafka_error_type
42
        else:
43
            message.error.return_value = False
44
        return message
45
46
    def sample_messages(self, payload, count):
47
        message = self.create_kafka_message(None, payload)
48
        return count * [message], count * [payload]
49
50
    def test_handle_read(self):
51
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
52
        self.reader.consumer.poll.side_effect = sample_messages
53
        self.reader._connect = mock.MagicMock()
54
        received_messages = list(self.reader._handle_read())
55
        self.assertEqual(received_messages, extracted_messages)
56
57
    def receive_messages(self):
58
        for message in self.reader.read():
59
            yield message
60