| @@ 99-150 (lines=52) @@ | ||
| 96 | ||
| 97 | return db |
|
| 98 | ||
| 99 | def _normalize(self, name, columns, points): |
|
| 100 | """Normalize data for the InfluxDB's data model. |
|
| 101 | Output is a list of measurements.""" |
|
| 102 | ret = [] |
|
| 103 | ||
| 104 | # Build initial dict by crossing columns and point |
|
| 105 | data_dict = dict(zip(columns, points)) |
|
| 106 | ||
| 107 | # issue1871 - Check if a key exist. If a key exist, the value of |
|
| 108 | # the key should be used as a tag to identify the measurement. |
|
| 109 | keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')] |
|
| 110 | if len(keys_list) == 0: |
|
| 111 | keys_list = [None] |
|
| 112 | ||
| 113 | for measurement in keys_list: |
|
| 114 | # Manage field |
|
| 115 | if measurement is not None: |
|
| 116 | fields = {k.replace('{}.'.format(measurement), ''): data_dict[k] |
|
| 117 | for k in data_dict |
|
| 118 | if k.startswith('{}.'.format(measurement))} |
|
| 119 | else: |
|
| 120 | fields = data_dict |
|
| 121 | # Transform to InfluxDB datamodel |
|
| 122 | # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/ |
|
| 123 | for k in fields: |
|
| 124 | # Do not export empty (None) value |
|
| 125 | if fields[k] is None: |
|
| 126 | fields.pop(k) |
|
| 127 | # Convert numerical to float |
|
| 128 | try: |
|
| 129 | fields[k] = float(fields[k]) |
|
| 130 | except (TypeError, ValueError): |
|
| 131 | # Convert others to string |
|
| 132 | try: |
|
| 133 | fields[k] = str(fields[k]) |
|
| 134 | except (TypeError, ValueError): |
|
| 135 | pass |
|
| 136 | # Manage tags |
|
| 137 | tags = self.parse_tags(self.tags) |
|
| 138 | if 'key' in fields and fields['key'] in fields: |
|
| 139 | # Create a tag from the key |
|
| 140 | # Tag should be an string (see InfluxDB data model) |
|
| 141 | tags[fields['key']] = str(fields[fields['key']]) |
|
| 142 | # Remove it from the field list (can not be a field and a tag) |
|
| 143 | fields.pop(fields['key']) |
|
| 144 | # Add the hostname as a tag |
|
| 145 | tags['hostname'] = self.hostname |
|
| 146 | # Add the measurement to the list |
|
| 147 | ret.append({'measurement': name, |
|
| 148 | 'tags': tags, |
|
| 149 | 'fields': fields}) |
|
| 150 | return ret |
|
| 151 | ||
| 152 | def export(self, name, columns, points): |
|
| 153 | """Write the points to the InfluxDB server.""" |
|
| @@ 89-139 (lines=51) @@ | ||
| 86 | exponential_base=2)) |
|
| 87 | return write_client |
|
| 88 | ||
| 89 | def _normalize(self, name, columns, points): |
|
| 90 | """Normalize data for the InfluxDB's data model.""" |
|
| 91 | ret = [] |
|
| 92 | ||
| 93 | # Build initial dict by crossing columns and point |
|
| 94 | data_dict = dict(zip(columns, points)) |
|
| 95 | ||
| 96 | # issue1871 - Check if a key exist. If a key exist, the value of |
|
| 97 | # the key should be used as a tag to identify the measurement. |
|
| 98 | keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')] |
|
| 99 | if len(keys_list) == 0: |
|
| 100 | keys_list = [None] |
|
| 101 | ||
| 102 | for measurement in keys_list: |
|
| 103 | # Manage field |
|
| 104 | if measurement is not None: |
|
| 105 | fields = {k.replace('{}.'.format(measurement), ''): data_dict[k] |
|
| 106 | for k in data_dict |
|
| 107 | if k.startswith('{}.'.format(measurement))} |
|
| 108 | else: |
|
| 109 | fields = data_dict |
|
| 110 | # Transform to InfluxDB datamodel |
|
| 111 | # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/ |
|
| 112 | for k in fields: |
|
| 113 | # Do not export empty (None) value |
|
| 114 | if fields[k] is None: |
|
| 115 | fields.pop(k) |
|
| 116 | # Convert numerical to float |
|
| 117 | try: |
|
| 118 | fields[k] = float(fields[k]) |
|
| 119 | except (TypeError, ValueError): |
|
| 120 | # Convert others to string |
|
| 121 | try: |
|
| 122 | fields[k] = str(fields[k]) |
|
| 123 | except (TypeError, ValueError): |
|
| 124 | pass |
|
| 125 | # Manage tags |
|
| 126 | tags = self.parse_tags(self.tags) |
|
| 127 | if 'key' in fields and fields['key'] in fields: |
|
| 128 | # Create a tag from the key |
|
| 129 | # Tag should be an string (see InfluxDB data model) |
|
| 130 | tags[fields['key']] = str(fields[fields['key']]) |
|
| 131 | # Remove it from the field list (can not be a field and a tag) |
|
| 132 | fields.pop(fields['key']) |
|
| 133 | # Add the hostname as a tag |
|
| 134 | tags['hostname'] = self.hostname |
|
| 135 | # Add the measurement to the list |
|
| 136 | ret.append({'measurement': name, |
|
| 137 | 'tags': tags, |
|
| 138 | 'fields': fields}) |
|
| 139 | return ret |
|
| 140 | ||
| 141 | def export(self, name, columns, points): |
|
| 142 | """Write the points to the InfluxDB server.""" |
|