Source code for caspia.toolbox.influxdb.client

# pylint: disable=too-many-instance-attributes
import asyncio
import logging
import warnings
from urllib.parse import urljoin

import aiohttp

from .encode import encode_influx_point


[docs]class InfluxDBClient: def __init__(self, url, database, interval=5.168, loop=None, buffer_size=100, precision='ms', logger=None): if loop is not None: warnings.warn('loop argument is deprecated', DeprecationWarning, stacklevel=2) self._url = url self._database = database self._db_checked = False self._interval = interval self._pending = [] self._buffer_size = buffer_size self._precision = precision self.logger = logger or logging.getLogger(__name__) self._task = None def _start_background_task_if_needed(self): loop = asyncio.get_event_loop() if not self._task and loop.is_running(): self._task = asyncio.ensure_future(self._write_to_influx_loop()) self._task.add_done_callback(self._write_to_influx_loop_finished)
[docs] def close(self): self._task.cancel() self._task = None
[docs] def write(self, measurement, timestamp, fields, tags): self._start_background_task_if_needed() point = encode_influx_point(measurement, timestamp, fields, tags) self._pending.append(point)
@property def _session(self): session = getattr(self, '_session_cache', None) if not session: session = aiohttp.ClientSession() setattr(self, '_session_cache', session) return session async def _write_to_influx_loop(self): while True: points = self._pending self._pending = [] try: if len(points): await self._check_db_exists() await self._write_to_influx(points) self.logger.debug('Successfully wrote %d points to influx.', len(points)) else: self.logger.debug('No points to be written to influx.') except Exception as e: # pylint: disable=broad-except self.logger.error('Failure when writing points to influx: %r', e) self._pending = points + self._pending if self._buffer_size is not None: self._pending = self._pending[-self._buffer_size:] await asyncio.sleep(self._interval) def _write_to_influx_loop_finished(self, task): self._task = None if not task.cancelled() and task.exception(): self.logger.exception('Exception ending writing to influx! %s', repr(task.exception())) async def _write_to_influx(self, points): self.logger.debug('Going to write lines: \n%s', points) # write to db url = urljoin(self._url, 'write') params = dict(db=self._database, precision=self._precision) data = '\n'.join(points).encode('utf-8') async with self._session.post(url, params=params, data=data) as response: if response.status == 400: error = (await response.json()).get('error', '-') self.logger.error('Invalid request error: %s', error) else: response.raise_for_status() return len(points) async def _check_db_exists(self): if self._db_checked: return query = 'CREATE DATABASE %s' % self._database async with self._session.get(urljoin(self._url, 'query'), params=dict(q=query)) as response: self._db_checked = response.status == 200 if self._db_checked: self.logger.info('Database %r is ready', self._database) else: self.logger.error('Could not check db %r. Reason: %s', self._database, response.reason)