Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.tests.writer_test.TestInfluxDBWriter   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 47
Duplicated Lines 0 %
Metric Value
dl 0
loc 47
rs 10
wmc 4

4 Methods

Rating   Name   Duplication   Size   Complexity  
A TestInfluxDBWriter.create_writer() 0 14 1
A TestInfluxDBWriter.setUp() 0 7 1
A TestInfluxDBWriter.test_write() 0 15 1
A TestInfluxDBWriter.test_ssl_connection() 0 7 1
1
import unittest
2
from mock import MagicMock
3
from kafka_influxdb.writer import influxdb_writer
4
import influxdb
5
6
7
class TestInfluxDBWriter(unittest.TestCase):
8
    def setUp(self):
9
        self.host = "myhost",
10
        self.port = 1234
11
        self.user = "test"
12
        self.password = "test"
13
        self.dbname = "mydb"
14
        self.verify_ssl = False
15
16
    def create_writer(self, use_ssl=False, use_udp=False, timeout=None):
17
        self.use_ssl = use_ssl
18
        self.use_udp = use_udp
19
        self.timeout = timeout
20
        return influxdb_writer.InfluxDBWriter(self.host,
21
                                              self.port,
22
                                              self.user,
23
                                              self.password,
24
                                              self.dbname,
25
                                              self.use_ssl,
26
                                              self.verify_ssl,
27
                                              self.timeout,
28
                                              self.use_udp,
29
                                              self.port)
30
31
    def test_write(self):
32
        writer = self.create_writer()
33
        writer.client = MagicMock()
34
        writer.write([
35
            "cpu,host=server01,region=uswest value=1.0 1434055562000",
36
            "cpu,host=server02,region=uswest value=2.0 1434055562005"
37
        ])
38
        writer.client.request.assert_called_once_with(url='write',
39
                                                      expected_response_code=204,
40
                                                      headers={'Content-type': 'application/octet-stream',
41
                                                               'Accept': 'text/plain'},
42
                                                      params={'rp': 1234, 'db': 'mydb'},
43
                                                      data='cpu,host=server01,region=uswest value=1.0 1434055562000\n'
44
                                                      'cpu,host=server02,region=uswest value=2.0 1434055562005',
45
                                                      method='POST')
46
47
    def test_ssl_connection(self):
48
        influxdb.InfluxDBClient = MagicMock()
49
        writer = self.create_writer(True)
50
        writer.client = MagicMock()
51
        influxdb.InfluxDBClient.assert_called_once_with(
52
            self.host, self.port, self.user, self.password, self.dbname,
53
            self.use_ssl, self.verify_ssl, self.timeout, self.use_udp, self.port
54
        )
55