Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.writer.InfluxDBWriter.write()   B

Complexity

Conditions 3

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 25
rs 8.8571
1
# -*- coding: utf-8 -*-
2
3
import logging
4
import influxdb
5
6
7
class InfluxDBWriter(object):
8
    DEFAULT_HEADERS = {
9
        'Content-type': 'application/octet-stream',
10
        'Accept': 'text/plain'
11
    }
12
13
    def __init__(self,
14
                 host,
15
                 port,
16
                 user,
17
                 password,
18
                 dbname,
19
                 use_ssl=False,
20
                 verify_ssl=False,
21
                 timeout=5,
22
                 use_udp=False,
23
                 retention_policy=None,
24
                 time_precision=None):
25
        """
26
        Initialize InfluxDB writer
27
        """
28
        self.host = host
29
        self.port = port
30
        self.user = user
31
        self.password = password
32
        self.dbname = dbname
33
        self.use_ssl = use_ssl
34
        self.verify_ssl = verify_ssl
35
        self.timeout = timeout
36
        self.use_udp = use_udp
37
        self.retention_policy = retention_policy
38
        self.time_precision = time_precision
39
40
        self.params = {'db': self.dbname}
41
        self.headers = self.DEFAULT_HEADERS
42
        if time_precision:
43
            self.params['precision'] = time_precision
44
        if retention_policy:
45
            self.params['rp'] = retention_policy
46
47
        logging.info("Connecting to InfluxDB at %s:%s (SSL: %r, UDP: %r)", host, port, use_ssl, use_udp)
48
        self.client = self.create_client()
49
        logging.info("Creating database %s if not exists", dbname)
50
51
    def create_client(self):
52
        """
53
        Create an InfluxDB client
54
        """
55
        return influxdb.InfluxDBClient(self.host,
56
                                       self.port,
57
                                       self.user,
58
                                       self.password,
59
                                       self.dbname,
60
                                       self.use_ssl,
61
                                       self.verify_ssl,
62
                                       self.timeout,
63
                                       self.use_udp,
64
                                       self.port)
65
66
    def create_database(self, dbname):
67
        """
68
        Initialize the given database
69
        :param dbname:
70
        """
71
        self.client.create_database(dbname)
72
73
    def write(self, msg, params=None, expected_response_code=204):
74
        """
75
        Write messages to InfluxDB database.
76
        Expects messages in line protocol format.
77
        See https://influxdb.com/docs/v0.9/write_protocols/line.html
78
        :param expected_response_code:
79
        :param params:
80
        :param msg:
81
        """
82
        if not params:
83
            # Use defaults
84
            params = self.params
85
86
        try:
87
            self.client.request(url='write',
88
                                method='POST',
89
                                params=params,
90
                                data="\n".join(msg),
91
                                expected_response_code=expected_response_code,
92
                                headers=self.headers
93
                                )
94
        except Exception as e:
95
            logging.warning("Cannot write data points: %s", e)
96
            return False
97
        return True
98
99
    def write08(self):
100
        """
101
        TODO: Write in InfluxDB legacy 08 format:
102
        data = [
103
            {
104
                "name": "cpu_load_short",
105
                "columns": [
106
                    "value"
107
                ]
108
                "points": [
109
                    [
110
                        12
111
                    ]
112
                ],
113
            }
114
        ]
115
        client.write_points(data, time_precision='s', *args, **kwargs):
116
        """
117
        raise NotImplementedError
118