InfluxDBWriter.__init__()   B
last analyzed

Complexity

Conditions 3

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 37
rs 8.8571
1
# -*- coding: utf-8 -*-
2
3
import logging
4
import influxdb
5
6
try:
7
    # Test for mypy support (requires Python 3)
8
    from typing import Mapping, Text
9
except ImportError:
10
    pass
11
12
13
class InfluxDBWriter(object):
14
    DEFAULT_HEADERS = {
15
        'Content-type': 'application/octet-stream',
16
        'Accept': 'text/plain'
17
    }
18
19
    def __init__(self,
20
                 host,
21
                 port,
22
                 user,
23
                 password,
24
                 dbname,
25
                 use_ssl=False,
26
                 verify_ssl=False,
27
                 timeout=5,
28
                 use_udp=False,
29
                 retention_policy=None,
30
                 time_precision=None):
31
        """
32
        Initialize InfluxDB writer
33
        """
34
        self.host = host
35
        self.port = port
36
        self.user = user
37
        self.password = password
38
        self.dbname = dbname
39
        self.use_ssl = use_ssl
40
        self.verify_ssl = verify_ssl
41
        self.timeout = timeout
42
        self.use_udp = use_udp
43
        self.retention_policy = retention_policy
44
        self.time_precision = time_precision
45
46
        self.params = {'db': self.dbname}
47
        self.headers = self.DEFAULT_HEADERS
48
        if time_precision:
49
            self.params['precision'] = time_precision
50
        if retention_policy:
51
            self.params['rp'] = retention_policy
52
53
        logging.info("Connecting to InfluxDB at %s:%s (SSL: %r, UDP: %r)",
54
                     host, port, use_ssl, use_udp)
55
        self.client = self.create_client()
56
57
    def create_client(self):
58
        """
59
        Create an InfluxDB client
60
        """
61
        return influxdb.InfluxDBClient(self.host,
62
                                       self.port,
63
                                       self.user,
64
                                       self.password,
65
                                       self.dbname,
66
                                       self.use_ssl,
67
                                       self.verify_ssl,
68
                                       self.timeout,
69
                                       self.use_udp,
70
                                       self.port)
71
72
    def create_database(self, dbname):
73
        # type: (Text) -> bool
74
        """
75
        Initialize the given database
76
        :param dbname:
77
        """
78
        self.client.create_database(dbname)
79
80
    def write(self, msg, params=None, expected_response_code=204):
81
        # type: (Text, Mapping[str, str], int) -> bool
82
        """
83
        Write messages to InfluxDB database.
84
        Expects messages in line protocol format.
85
        See https://influxdb.com/docs/v0.9/write_protocols/line.html
86
        :param expected_response_code:
87
        :param params:
88
        :param msg:
89
        """
90
        if not params:
91
            # Use defaults
92
            params = self.params
93
94
        try:
95
            logging.debug("Writing message: %s", msg)
96
            self.client.request(url='write',
97
                                method='POST',
98
                                params=params,
99
                                data="\n".join(msg),
100
                                expected_response_code=expected_response_code,
101
                                headers=self.headers
102
                                )
103
        except Exception as e:
104
            logging.warning("Cannot write data points: %s", e)
105
            return False
106
        return True
107