1
|
|
|
import unittest |
2
|
|
|
from mock import Mock |
3
|
|
|
import random |
4
|
|
|
from kafka_influxdb.worker import Worker |
5
|
|
|
from kafka_influxdb.encoder import echo_encoder |
6
|
|
|
|
7
|
|
|
|
8
|
|
|
class Config: |
9
|
|
|
def __init__(self, buffer_size): |
10
|
|
|
self.buffer_size = buffer_size |
11
|
|
|
self.kafka_topic = "test" |
12
|
|
|
self.influxdb_dbname = "mydb" |
13
|
|
|
|
14
|
|
|
|
15
|
|
|
class DummyReader(object): |
16
|
|
|
""" |
17
|
|
|
A reader that yields dummy messages |
18
|
|
|
""" |
19
|
|
|
def __init__(self, messages, num_messages): |
20
|
|
|
self.messages = messages |
21
|
|
|
self.num_messages = num_messages |
22
|
|
|
|
23
|
|
|
def read(self): |
24
|
|
|
for i in range(self.num_messages): |
25
|
|
|
yield random.choice(self.messages) |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
class FlakyReader(object): |
29
|
|
|
""" |
30
|
|
|
A fake reader that throws exceptions to simulate |
31
|
|
|
connection errors |
32
|
|
|
""" |
33
|
|
|
def __init__(self, message, num_messages): |
34
|
|
|
self.message = message |
35
|
|
|
self.num_messages = num_messages |
36
|
|
|
|
37
|
|
|
def read(self): |
38
|
|
|
# Yield the first half of messages |
39
|
|
|
for i in range(int(self.num_messages/2)): |
40
|
|
|
yield self.message |
41
|
|
|
# Simulate a connection error while reading |
42
|
|
|
try: |
43
|
|
|
raise Exception |
44
|
|
|
except Exception: |
45
|
|
|
# Continue like you don't care. |
46
|
|
|
# Yield the second half of messages |
47
|
|
|
for i in range(int(self.num_messages/2)): |
48
|
|
|
yield self.message |
49
|
|
|
|
50
|
|
|
class DummyWriter(object): |
51
|
|
|
""" |
52
|
|
|
A fake writer that does nothing with the input data |
53
|
|
|
""" |
54
|
|
|
def __init__(self): |
55
|
|
|
pass |
56
|
|
|
|
57
|
|
|
def write(self): |
58
|
|
|
pass |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
class TestKafkaInfluxDB(unittest.TestCase): |
62
|
|
|
def setUp(self): |
63
|
|
|
self.config = Config(100) |
64
|
|
|
self.encoder = echo_encoder.Encoder() |
65
|
|
|
self.writer = DummyWriter() |
66
|
|
|
self.writer = Mock() |
67
|
|
|
self.writer.write.return_value = True |
68
|
|
|
|
69
|
|
|
def test_buffering(self): |
70
|
|
|
self.reader = DummyReader(["foo"], self.config.buffer_size - 1) |
71
|
|
|
self.client = Worker(self.reader, self.encoder, self.writer, self.config) |
72
|
|
|
self.client.consume() |
73
|
|
|
self.assertFalse(self.writer.write.called) |
74
|
|
|
|
75
|
|
|
def test_flush(self): |
76
|
|
|
self.reader = DummyReader(["bar"], self.config.buffer_size) |
77
|
|
|
self.client = Worker(self.reader, self.encoder, self.writer, self.config) |
78
|
|
|
self.client.consume() |
79
|
|
|
self.assertTrue(self.writer.write.called) |
80
|
|
|
|
81
|
|
|
def test_reconnect(self): |
82
|
|
|
self.reader = FlakyReader(["baz"], self.config.buffer_size) |
83
|
|
|
self.client = Worker(self.reader, self.encoder, self.writer, self.config) |
84
|
|
|
self.client.consume() |
85
|
|
|
self.assertTrue(self.writer.write.called) |
86
|
|
|
self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size) |
87
|
|
|
|