Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.writer.KafkaSampleWriter   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 63
Duplicated Lines 0 %
Metric Value
dl 0
loc 63
rs 10
wmc 13

8 Methods

Rating   Name   Duplication   Size   Complexity  
A KafkaSampleWriter._get_partitions() 0 2 1
A KafkaSampleWriter.produce_messages() 0 14 3
A KafkaSampleWriter.__init__() 0 10 1
A KafkaSampleWriter._create_kafka_client() 0 4 1
A KafkaSampleWriter._send_requests() 0 7 2
A KafkaSampleWriter._create_request() 0 3 1
A KafkaSampleWriter._create_random_messages() 0 3 2
A KafkaSampleWriter._send_request_batches() 0 6 2
1
import kafka
2
import kafka.common
3
import random
4
import logging
5
import time
6
7
8
class KafkaWriterException(Exception):
9
    pass
10
11
12
class KafkaSampleWriter(object):
13
    """
14
    KafkaSampleWriter can be used to write sample messages into Kafka for
15
    benchmark purposes
16
    """
17
18
    def __init__(self, host, port, topic):
19
        self.kafka_client = self._create_kafka_client(host, port)
20
        self.topic = topic
21
22
        self.sample_messages = [
23
            b"""26f2fc918f50.load.load.shortterm 0.05 1436357630
24
            26f2fc918f50.load.load.midterm 0.05 1436357630
25
            26f2fc918f50.load.load.longterm 0.05 1436357630""",
26
            b"26f2fc918f50.cpu-0.cpu-user 30364 1436357630",
27
            b"26f2fc918f50.memory.memory-buffered 743657472 1436357630"
28
        ]
29
30
    def produce_messages(self, batches=1000, batch_size=1000):
31
        """
32
        Produce Kafka sample messages
33
        :param batches: number of message batches
34
        :param batch_size: messages per batch
35
        :return:
36
        """
37
        partitions = self._get_partitions(self.topic)
38
        if not partitions:
39
            raise KafkaWriterException("No partitions found for %s" % self.topic)
40
41
        kafka_messages = self._create_random_messages(self.sample_messages, batch_size)
42
        kafka_requests = [self._create_request(self.topic, p, kafka_messages) for p in partitions]
43
        self._send_request_batches(kafka_requests, batches, batch_size)
44
45
    def _send_request_batches(self, kafka_requests, batches, batch_size):
46
        total_messages = batches * batch_size
47
        for i in range(batches):
48
            self._send_requests(kafka_requests)
49
            logging.info("Sent %s out of %s messages", i * batch_size, total_messages)
50
        self.kafka_client.close()
51
52
    def _send_requests(self, requests):
53
        try:
54
            self.kafka_client.send_produce_request(payloads=requests, fail_on_error=True)
55
        except kafka.common.UnknownTopicOrPartitionError as e:
56
            logging.error(e)
57
            time.sleep(1)
58
            self.kafka_client.close()
59
60
    def _get_partitions(self, topic):
61
        return self.kafka_client.get_partition_ids_for_topic(topic)
62
63
    @staticmethod
64
    def _create_kafka_client(host, port):
65
        logging.info("Connecting to Kafka broker at %s:%s", host, port)
66
        return kafka.KafkaClient("{}:{}".format(host, port))
67
68
    @staticmethod
69
    def _create_random_messages(messages, count):
70
        return [kafka.create_message(random.choice(messages)) for _ in range(count)]
71
72
    @staticmethod
73
    def _create_request(topic, partition, messages):
74
        return kafka.common.ProduceRequest(topic=topic, partition=partition, messages=messages)
75