1
|
|
|
import unittest |
2
|
|
|
from mock import Mock |
3
|
|
|
import random |
4
|
|
|
import time |
5
|
|
|
from kafka_influxdb.worker import Worker |
6
|
|
|
from kafka_influxdb.encoder import echo_encoder |
7
|
|
|
from kafka_influxdb.tests.helpers.timeout import timeout |
8
|
|
|
from requests.exceptions import ConnectionError |
9
|
|
|
from mock import patch |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
class Config: |
13
|
|
|
""" |
14
|
|
|
Dummy config with minimum settings to pass the tests |
15
|
|
|
""" |
16
|
|
|
|
17
|
|
|
def __init__(self, buffer_size, buffer_timeout): |
18
|
|
|
self.buffer_size = buffer_size |
19
|
|
|
self.buffer_timeout = buffer_timeout |
20
|
|
|
self.kafka_topic = "test" |
21
|
|
|
self.influxdb_dbname = "mydb" |
22
|
|
|
self.statistics = False |
23
|
|
|
|
24
|
|
|
|
25
|
|
|
class DummyReader(object): |
26
|
|
|
""" |
27
|
|
|
A reader that yields dummy messages |
28
|
|
|
""" |
29
|
|
|
|
30
|
|
|
def __init__(self, messages, num_messages): |
31
|
|
|
self.messages = messages |
32
|
|
|
self.num_messages = num_messages |
33
|
|
|
|
34
|
|
|
def read(self): |
35
|
|
|
for i in range(self.num_messages): |
36
|
|
|
yield random.choice(self.messages) |
37
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
38
|
|
|
raise KeyboardInterrupt |
39
|
|
|
|
40
|
|
|
|
41
|
|
|
class TimeoutReader(object): |
42
|
|
|
""" |
43
|
|
|
A reader that writes half the messages then times out |
44
|
|
|
""" |
45
|
|
|
|
46
|
|
|
def __init__(self, message, num_messages, timeout): |
47
|
|
|
self.message = message |
48
|
|
|
self.num_messages = num_messages |
49
|
|
|
self.timeout = timeout |
50
|
|
|
|
51
|
|
|
def read(self): |
52
|
|
|
for i in range(self.num_messages-1): |
53
|
|
|
yield self.message |
54
|
|
|
# Simulate no additional messages causing timeout |
55
|
|
|
time.sleep(self.timeout) |
56
|
|
|
yield False |
57
|
|
|
# Stop consuming |
58
|
|
|
raise SystemExit |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
class FlakyReader(object): |
62
|
|
|
""" |
63
|
|
|
A fake reader that throws exceptions to simulate |
64
|
|
|
connection errors |
65
|
|
|
""" |
66
|
|
|
|
67
|
|
|
def __init__(self, message, num_messages): |
68
|
|
|
self.message = message |
69
|
|
|
self.num_messages = num_messages |
70
|
|
|
|
71
|
|
|
def read(self): |
72
|
|
|
# Yield the first half of messages |
73
|
|
|
for i in range(int(self.num_messages/2)): |
74
|
|
|
yield self.message |
75
|
|
|
# Simulate a connection error while reading |
76
|
|
|
try: |
77
|
|
|
raise Exception |
78
|
|
|
except Exception: |
79
|
|
|
# Continue like you don't care. |
80
|
|
|
# Yield the second half of messages |
81
|
|
|
for i in range(int(self.num_messages/2)): |
82
|
|
|
yield self.message |
83
|
|
|
# Simulate keyboard interrupt by user to stop consuming |
84
|
|
|
raise KeyboardInterrupt |
85
|
|
|
|
86
|
|
|
|
87
|
|
|
class TestWorker(unittest.TestCase): |
88
|
|
|
""" |
89
|
|
|
Tests for message worker. |
90
|
|
|
""" |
91
|
|
|
|
92
|
|
|
def setUp(self): |
93
|
|
|
self.config = Config(10, 0.1) |
94
|
|
|
self.encoder = echo_encoder.Encoder() |
95
|
|
|
self.writer = Mock() |
96
|
|
|
self.writer.write.return_value = True |
97
|
|
|
|
98
|
|
|
@patch('time.sleep', return_value=None) |
99
|
|
|
def test_create_database(self, _): |
100
|
|
|
""" |
101
|
|
|
Retry creating the InfluxDB database in case of a connection error |
102
|
|
|
""" |
103
|
|
|
reader = DummyReader(["bar"], self.config.buffer_size) |
104
|
|
|
writer_with_connection_error = Mock() |
105
|
|
|
writer_with_connection_error.create_database = Mock( |
106
|
|
|
side_effect=[ConnectionError, None]) |
107
|
|
|
|
108
|
|
|
client = Worker(reader, self.encoder, |
109
|
|
|
writer_with_connection_error, self.config) |
110
|
|
|
client.consume() |
111
|
|
|
self.assertEqual( |
112
|
|
|
writer_with_connection_error.create_database.call_count, 2) |
113
|
|
|
|
114
|
|
|
def test_flush(self): |
115
|
|
|
""" |
116
|
|
|
Messages should be flushed to the writer when the buffer is full. |
117
|
|
|
""" |
118
|
|
|
reader = DummyReader(["bar"], self.config.buffer_size) |
119
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
120
|
|
|
client.consume() |
121
|
|
|
self.assertTrue(self.writer.write.called) |
122
|
|
|
self.writer.write.assert_called_once_with( |
123
|
|
|
["bar"] * self.config.buffer_size) |
124
|
|
|
|
125
|
|
|
def test_buffer_timeout(self): |
126
|
|
|
""" |
127
|
|
|
If the buffer has timed out flush to the writer. |
128
|
|
|
""" |
129
|
|
|
reader = TimeoutReader( |
130
|
|
|
"bar", self.config.buffer_size, self.config.buffer_timeout) |
131
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
132
|
|
|
client.consume() |
133
|
|
|
self.assertTrue(self.writer.write.called) |
134
|
|
|
self.writer.write.assert_called_once_with( |
135
|
|
|
["bar"] * int(self.config.buffer_size-1)) |
136
|
|
|
|
137
|
|
|
@timeout(0.1) |
138
|
|
|
def test_reconnect(self): |
139
|
|
|
""" |
140
|
|
|
The worker should reconnect when the connection is interrupted. |
141
|
|
|
""" |
142
|
|
|
reader = FlakyReader("baz", self.config.buffer_size) |
143
|
|
|
client = Worker(reader, self.encoder, self.writer, self.config) |
144
|
|
|
client.consume() |
145
|
|
|
self.assertTrue(self.writer.write.called) |
146
|
|
|
self.writer.write.assert_called_once_with( |
147
|
|
|
["baz"] * self.config.buffer_size) |
148
|
|
|
|