1
|
|
|
import unittest |
2
|
|
|
from mock import Mock |
3
|
|
|
import random |
4
|
|
|
from kafka_influxdb.worker import Worker |
5
|
|
|
from kafka_influxdb.encoder import echo_encoder |
6
|
|
|
from kafka_influxdb.tests.helpers.timeout import timeout |
7
|
|
|
from kafka.common import ConnectionError |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
class Config: |
11
|
|
|
""" |
12
|
|
|
Dummy config with minimum settings to pass the tests |
13
|
|
|
""" |
14
|
|
|
def __init__(self, buffer_size): |
15
|
|
|
self.buffer_size = buffer_size |
16
|
|
|
self.kafka_topic = "test" |
17
|
|
|
self.influxdb_dbname = "mydb" |
18
|
|
|
self.statistics = False |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
class DummyReader(object): |
22
|
|
|
""" |
23
|
|
|
A reader that yields dummy messages |
24
|
|
|
""" |
25
|
|
|
def __init__(self, messages, num_messages): |
26
|
|
|
self.messages = messages |
27
|
|
|
self.num_messages = num_messages |
28
|
|
|
|
29
|
|
|
def read(self): |
30
|
|
|
for i in range(self.num_messages): |
31
|
|
|
yield random.choice(self.messages) |
32
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
33
|
|
|
raise KeyboardInterrupt |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
class FlakyReader(object): |
37
|
|
|
""" |
38
|
|
|
A fake reader that throws exceptions to simulate |
39
|
|
|
connection errors |
40
|
|
|
""" |
41
|
|
|
def __init__(self, message, num_messages): |
42
|
|
|
self.message = message |
43
|
|
|
self.num_messages = num_messages |
44
|
|
|
|
45
|
|
|
def read(self): |
46
|
|
|
# Yield the first half of messages |
47
|
|
|
for i in range(int(self.num_messages/2)): |
48
|
|
|
yield self.message |
49
|
|
|
# Simulate a connection error while reading |
50
|
|
|
try: |
51
|
|
|
raise Exception |
52
|
|
|
except Exception: |
53
|
|
|
# Continue like you don't care. |
54
|
|
|
# Yield the second half of messages |
55
|
|
|
for i in range(int(self.num_messages/2)): |
56
|
|
|
yield self.message |
57
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
58
|
|
|
raise KeyboardInterrupt |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
class TestWorker(unittest.TestCase): |
62
|
|
|
""" |
63
|
|
|
Tests for message worker. |
64
|
|
|
""" |
65
|
|
|
def setUp(self): |
66
|
|
|
self.config = Config(10) |
67
|
|
|
self.encoder = echo_encoder.Encoder() |
68
|
|
|
self.writer = Mock() |
69
|
|
|
self.writer.write.return_value = True |
70
|
|
|
|
71
|
|
|
def test_flush(self): |
72
|
|
|
""" |
73
|
|
|
Messages should be flushed to the writer when the buffer is full. |
74
|
|
|
""" |
75
|
|
|
reader = DummyReader(["bar"], self.config.buffer_size) |
76
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
77
|
|
|
client.consume() |
78
|
|
|
self.assertTrue(self.writer.write.called) |
79
|
|
|
self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size) |
80
|
|
|
|
81
|
|
|
@timeout(0.1) |
82
|
|
|
def test_reconnect(self): |
83
|
|
|
""" |
84
|
|
|
The worker should reconnect when the connection is interrupted. |
85
|
|
|
""" |
86
|
|
|
reader = FlakyReader("baz", self.config.buffer_size) |
87
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
88
|
|
|
client.consume() |
89
|
|
|
self.assertTrue(self.writer.write.called) |
90
|
|
|
self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size) |
91
|
|
|
|