Source code for caspia.toolbox.influxdb.encode

[docs]def encode_influx_point(measurement, timestamp, fields, tags): tags_str = ','.join('%s=%s' % (escape_tag(k), escape_tag(v)) for k, v in tags.items()) fields_str = ','.join('%s=%s' % (escape_tag(k), escape_value(v)) for k, v in fields.items()) if len(tags): fmt = '{measurement},{tags} {fields} {timestamp}' else: fmt = '{measurement} {fields} {timestamp}' return fmt.format(measurement=escape_tag(measurement), fields=fields_str, tags=tags_str, timestamp=timestamp)
[docs]def escape_tag(tag): return tag.replace("\\", "\\\\").replace(" ", "\\ ").replace(",", "\\,").replace("=", "\\=")
[docs]def escape_value(value): if isinstance(value, bool): return str(value) elif isinstance(value, int): return str(value) + 'i' elif isinstance(value, float): return repr(value) else: return quote_ident(str(value))
[docs]def quote_ident(value): return "\"{}\"".format(value.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n"))