1
|
|
|
import unittest |
2
|
|
|
from mock import Mock |
3
|
|
|
import random |
4
|
|
|
import time |
5
|
|
|
from kafka_influxdb.worker import Worker |
6
|
|
|
from kafka_influxdb.encoder import echo_encoder |
7
|
|
|
from kafka_influxdb.tests.helpers.timeout import timeout |
8
|
|
|
from requests.exceptions import ConnectionError |
9
|
|
|
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
class Config: |
13
|
|
|
""" |
14
|
|
|
Dummy config with minimum settings to pass the tests |
15
|
|
|
""" |
16
|
|
|
def __init__(self, buffer_size, buffer_timeout): |
17
|
|
|
self.buffer_size = buffer_size |
18
|
|
|
self.buffer_timeout = buffer_timeout |
19
|
|
|
self.kafka_topic = "test" |
20
|
|
|
self.influxdb_dbname = "mydb" |
21
|
|
|
self.statistics = False |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class DummyReader(object): |
25
|
|
|
""" |
26
|
|
|
A reader that yields dummy messages |
27
|
|
|
""" |
28
|
|
|
def __init__(self, messages, num_messages): |
29
|
|
|
self.messages = messages |
30
|
|
|
self.num_messages = num_messages |
31
|
|
|
|
32
|
|
|
def read(self): |
33
|
|
|
for i in range(self.num_messages): |
34
|
|
|
yield random.choice(self.messages) |
35
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
36
|
|
|
raise KeyboardInterrupt |
37
|
|
|
|
38
|
|
|
class TimeoutReader(object): |
39
|
|
|
""" |
40
|
|
|
A reader that writes half the messages then times out |
41
|
|
|
""" |
42
|
|
|
def __init__(self, message, num_messages, timeout): |
43
|
|
|
self.message = message |
44
|
|
|
self.num_messages = num_messages |
45
|
|
|
self.timeout = timeout |
46
|
|
|
|
47
|
|
|
def read(self): |
48
|
|
|
for i in range(self.num_messages-1): |
49
|
|
|
yield self.message |
50
|
|
|
# Simulate no additional messages causing timeout |
51
|
|
|
time.sleep(self.timeout) |
52
|
|
|
yield False |
53
|
|
|
# Stop consuming |
54
|
|
|
raise SystemExit |
55
|
|
|
|
56
|
|
|
class FlakyReader(object): |
57
|
|
|
""" |
58
|
|
|
A fake reader that throws exceptions to simulate |
59
|
|
|
connection errors |
60
|
|
|
""" |
61
|
|
|
def __init__(self, message, num_messages): |
62
|
|
|
self.message = message |
63
|
|
|
self.num_messages = num_messages |
64
|
|
|
|
65
|
|
|
def read(self): |
66
|
|
|
# Yield the first half of messages |
67
|
|
|
for i in range(int(self.num_messages/2)): |
68
|
|
|
yield self.message |
69
|
|
|
# Simulate a connection error while reading |
70
|
|
|
try: |
71
|
|
|
raise Exception |
72
|
|
|
except Exception: |
73
|
|
|
# Continue like you don't care. |
74
|
|
|
# Yield the second half of messages |
75
|
|
|
for i in range(int(self.num_messages/2)): |
76
|
|
|
yield self.message |
77
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
78
|
|
|
raise KeyboardInterrupt |
79
|
|
|
|
80
|
|
|
|
81
|
|
|
class TestWorker(unittest.TestCase): |
82
|
|
|
""" |
83
|
|
|
Tests for message worker. |
84
|
|
|
""" |
85
|
|
|
def setUp(self): |
86
|
|
|
self.config = Config(10, 0.1) |
87
|
|
|
self.encoder = echo_encoder.Encoder() |
88
|
|
|
self.writer = Mock() |
89
|
|
|
self.writer.write.return_value = True |
90
|
|
|
|
91
|
|
|
def test_create_database(self): |
92
|
|
|
""" |
93
|
|
|
Retry creating the InfluxDB database in case of a connection error |
94
|
|
|
""" |
95
|
|
|
reader = DummyReader(["bar"], self.config.buffer_size) |
96
|
|
|
writer_with_connection_error = Mock() |
97
|
|
|
writer_with_connection_error.create_database = Mock(side_effect=[ConnectionError, None]) |
98
|
|
|
|
99
|
|
|
client = Worker(reader, self.encoder, writer_with_connection_error, self.config) |
100
|
|
|
client.consume() |
101
|
|
|
self.assertEqual(writer_with_connection_error.create_database.call_count, 2) |
102
|
|
|
|
103
|
|
|
def test_flush(self): |
104
|
|
|
""" |
105
|
|
|
Messages should be flushed to the writer when the buffer is full. |
106
|
|
|
""" |
107
|
|
|
reader = DummyReader(["bar"], self.config.buffer_size) |
108
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
109
|
|
|
client.consume() |
110
|
|
|
self.assertTrue(self.writer.write.called) |
111
|
|
|
self.writer.write.assert_called_once_with(["bar"] * self.config.buffer_size) |
112
|
|
|
|
113
|
|
|
def test_buffer_timeout(self): |
114
|
|
|
""" |
115
|
|
|
If the buffer has timed out flush to the writer. |
116
|
|
|
""" |
117
|
|
|
reader = TimeoutReader("bar", self.config.buffer_size, self.config.buffer_timeout) |
118
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
119
|
|
|
client.consume() |
120
|
|
|
self.assertTrue(self.writer.write.called) |
121
|
|
|
self.writer.write.assert_called_once_with(["bar"] * int(self.config.buffer_size-1)) |
122
|
|
|
|
123
|
|
|
@timeout(0.1) |
124
|
|
|
def test_reconnect(self): |
125
|
|
|
""" |
126
|
|
|
The worker should reconnect when the connection is interrupted. |
127
|
|
|
""" |
128
|
|
|
reader = FlakyReader("baz", self.config.buffer_size) |
129
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
130
|
|
|
client.consume() |
131
|
|
|
self.assertTrue(self.writer.write.called) |
132
|
|
|
self.writer.write.assert_called_once_with(["baz"] * self.config.buffer_size) |
133
|
|
|
|