KafkaError   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 6
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 6
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 5 1
1
import unittest
2
import mock
3
import pytest
4
5
confluent = pytest.importorskip('kafka_influxdb.reader.confluent')
6
7
8
class KafkaError(object):
9
    def __init__(self):
10
        """
11
        Error types raised by confluent kafka
12
        """
13
        self._PARTITION_EOF = 1
14
15
16
class TestConfluentKafka(unittest.TestCase):
17
    def setUp(self):
18
        self.host = "myhost"
19
        self.port = 1234
20
        self.group = "mygroup"
21
        self.topic = "mytopic"
22
        self.offset = "largest"
23
        self.reconnect_wait_time = 0.01
24
        self.reader = self.create_reader()
25
26
    def create_reader(self):
27
        reader = confluent.Reader(self.host,
28
                                  self.port,
29
                                  self.group,
30
                                  self.topic,
31
                                  self.offset)
32
        reader.consumer = mock.MagicMock()
33
        return reader
34
35
    @staticmethod
36
    def create_kafka_message(key, value, kafka_error_type=None):
37
        message = mock.MagicMock()
38
        message.return_value = True
39
        message.key.return_value = key
40
        message.value.return_value = value
41
        if kafka_error_type:
42
            message.error.return_value = True
43
            message.error.code.return_value = kafka_error_type
44
        else:
45
            message.error.return_value = False
46
        return message
47
48
    def sample_messages(self, payload, count):
49
        message = self.create_kafka_message(None, payload)
50
        return count * [message], count * [payload]
51
52
    def test_handle_read(self):
53
        sample_messages, extracted_messages = self.sample_messages("hello", 3)
54
        self.reader.consumer.poll.side_effect = sample_messages
55
        self.reader._connect = mock.MagicMock()
56
        received_messages = list(self.reader._handle_read())
57
        self.assertEqual(received_messages, extracted_messages)
58
59
    def receive_messages(self):
60
        for message in self.reader.read():
61
            yield message
62