@@ 100-160 (lines=61) @@ | ||
97 | ) |
|
98 | return write_client |
|
99 | ||
100 | def _normalize(self, name, columns, points): |
|
101 | """Normalize data for the InfluxDB's data model. |
|
102 | ||
103 | :return: a list of measurements. |
|
104 | """ |
|
105 | ret = [] |
|
106 | ||
107 | # Build initial dict by crossing columns and point |
|
108 | data_dict = dict(zip(columns, points)) |
|
109 | ||
110 | # issue1871 - Check if a key exist. If a key exist, the value of |
|
111 | # the key should be used as a tag to identify the measurement. |
|
112 | keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')] |
|
113 | if len(keys_list) == 0: |
|
114 | keys_list = [None] |
|
115 | ||
116 | for measurement in keys_list: |
|
117 | # Manage field |
|
118 | if measurement is not None: |
|
119 | fields = { |
|
120 | k.replace('{}.'.format(measurement), ''): data_dict[k] |
|
121 | for k in data_dict |
|
122 | if k.startswith('{}.'.format(measurement)) |
|
123 | } |
|
124 | else: |
|
125 | fields = data_dict |
|
126 | # Transform to InfluxDB datamodel |
|
127 | # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/ |
|
128 | for k in fields: |
|
129 | # Do not export empty (None) value |
|
130 | if fields[k] is None: |
|
131 | continue |
|
132 | # Convert numerical to float |
|
133 | try: |
|
134 | fields[k] = float(fields[k]) |
|
135 | except (TypeError, ValueError): |
|
136 | # Convert others to string |
|
137 | try: |
|
138 | fields[k] = str(fields[k]) |
|
139 | except (TypeError, ValueError): |
|
140 | pass |
|
141 | # Manage tags |
|
142 | tags = self.parse_tags(self.tags) |
|
143 | if 'key' in fields and fields['key'] in fields: |
|
144 | # Create a tag from the key |
|
145 | # Tag should be an string (see InfluxDB data model) |
|
146 | tags[fields['key']] = str(fields[fields['key']]) |
|
147 | # Remove it from the field list (can not be a field and a tag) |
|
148 | fields.pop(fields['key']) |
|
149 | # Add the hostname as a tag |
|
150 | tags['hostname'] = self.hostname |
|
151 | # Add name as a tag (example for the process list) |
|
152 | for k in FIELD_TO_TAG: |
|
153 | if k in fields: |
|
154 | tags[k] = str(fields[k]) |
|
155 | # Remove it from the field list (can not be a field and a tag) |
|
156 | if k in fields: |
|
157 | fields.pop(fields[k]) |
|
158 | # Add the measurement to the list |
|
159 | ret.append({'measurement': name, 'tags': tags, 'fields': fields}) |
|
160 | return ret |
|
161 | ||
162 | def export(self, name, columns, points): |
|
163 | """Write the points to the InfluxDB server.""" |
@@ 89-149 (lines=61) @@ | ||
86 | ||
87 | return db |
|
88 | ||
89 | def _normalize(self, name, columns, points): |
|
90 | """Normalize data for the InfluxDB's data model. |
|
91 | ||
92 | :return: a list of measurements. |
|
93 | """ |
|
94 | ret = [] |
|
95 | ||
96 | # Build initial dict by crossing columns and point |
|
97 | data_dict = dict(zip(columns, points)) |
|
98 | ||
99 | # issue1871 - Check if a key exist. If a key exist, the value of |
|
100 | # the key should be used as a tag to identify the measurement. |
|
101 | keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')] |
|
102 | if len(keys_list) == 0: |
|
103 | keys_list = [None] |
|
104 | ||
105 | for measurement in keys_list: |
|
106 | # Manage field |
|
107 | if measurement is not None: |
|
108 | fields = { |
|
109 | k.replace('{}.'.format(measurement), ''): data_dict[k] |
|
110 | for k in data_dict |
|
111 | if k.startswith('{}.'.format(measurement)) |
|
112 | } |
|
113 | else: |
|
114 | fields = data_dict |
|
115 | # Transform to InfluxDB data model |
|
116 | # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/ |
|
117 | for k in fields: |
|
118 | # Do not export empty (None) value |
|
119 | if fields[k] is None: |
|
120 | continue |
|
121 | # Convert numerical to float |
|
122 | try: |
|
123 | fields[k] = float(fields[k]) |
|
124 | except (TypeError, ValueError): |
|
125 | # Convert others to string |
|
126 | try: |
|
127 | fields[k] = str(fields[k]) |
|
128 | except (TypeError, ValueError): |
|
129 | pass |
|
130 | # Manage tags |
|
131 | tags = self.parse_tags(self.tags) |
|
132 | if 'key' in fields and fields['key'] in fields: |
|
133 | # Create a tag from the key |
|
134 | # Tag should be an string (see InfluxDB data model) |
|
135 | tags[fields['key']] = str(fields[fields['key']]) |
|
136 | # Remove it from the field list (can not be a field and a tag) |
|
137 | fields.pop(fields['key']) |
|
138 | # Add the hostname as a tag |
|
139 | tags['hostname'] = self.hostname |
|
140 | # Add name as a tag (example for the process list) |
|
141 | for k in FIELD_TO_TAG: |
|
142 | if k in fields: |
|
143 | tags[k] = str(fields[k]) |
|
144 | # Remove it from the field list (can not be a field and a tag) |
|
145 | if k in fields: |
|
146 | fields.pop(fields[k]) |
|
147 | # Add the measurement to the list |
|
148 | ret.append({'measurement': name, 'tags': tags, 'fields': fields}) |
|
149 | return ret |
|
150 | ||
151 | def export(self, name, columns, points): |
|
152 | """Write the points to the InfluxDB server.""" |