# pylint: disable=too-many-instance-attributes
import asyncio
import logging
import warnings
from urllib.parse import urljoin
import aiohttp
from .encode import encode_influx_point
[docs]class InfluxDBClient:
def __init__(self,
url,
database,
interval=5.168,
loop=None,
buffer_size=100,
precision='ms',
logger=None):
if loop is not None:
warnings.warn('loop argument is deprecated', DeprecationWarning, stacklevel=2)
self._url = url
self._database = database
self._db_checked = False
self._interval = interval
self._pending = []
self._buffer_size = buffer_size
self._precision = precision
self.logger = logger or logging.getLogger(__name__)
self._task = None
def _start_background_task_if_needed(self):
loop = asyncio.get_event_loop()
if not self._task and loop.is_running():
self._task = asyncio.ensure_future(self._write_to_influx_loop())
self._task.add_done_callback(self._write_to_influx_loop_finished)
[docs] def close(self):
self._task.cancel()
self._task = None
[docs] def write(self, measurement, timestamp, fields, tags):
self._start_background_task_if_needed()
point = encode_influx_point(measurement, timestamp, fields, tags)
self._pending.append(point)
@property
def _session(self):
session = getattr(self, '_session_cache', None)
if not session:
session = aiohttp.ClientSession()
setattr(self, '_session_cache', session)
return session
async def _write_to_influx_loop(self):
while True:
points = self._pending
self._pending = []
try:
if len(points):
await self._check_db_exists()
await self._write_to_influx(points)
self.logger.debug('Successfully wrote %d points to influx.', len(points))
else:
self.logger.debug('No points to be written to influx.')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Failure when writing points to influx: %r', e)
self._pending = points + self._pending
if self._buffer_size is not None:
self._pending = self._pending[-self._buffer_size:]
await asyncio.sleep(self._interval)
def _write_to_influx_loop_finished(self, task):
self._task = None
if not task.cancelled() and task.exception():
self.logger.exception('Exception ending writing to influx! %s', repr(task.exception()))
async def _write_to_influx(self, points):
self.logger.debug('Going to write lines: \n%s', points)
# write to db
url = urljoin(self._url, 'write')
params = dict(db=self._database, precision=self._precision)
data = '\n'.join(points).encode('utf-8')
async with self._session.post(url, params=params, data=data) as response:
if response.status == 400:
error = (await response.json()).get('error', '-')
self.logger.error('Invalid request error: %s', error)
else:
response.raise_for_status()
return len(points)
async def _check_db_exists(self):
if self._db_checked:
return
query = 'CREATE DATABASE %s' % self._database
async with self._session.get(urljoin(self._url, 'query'), params=dict(q=query)) as response:
self._db_checked = response.status == 200
if self._db_checked:
self.logger.info('Database %r is ready', self._database)
else:
self.logger.error('Could not check db %r. Reason: %s', self._database,
response.reason)