Source code for caspia.toolbox.plugins.acan_metrics

# pylint: disable=unused-variable
import asyncio
import logging
import os

from aiocan.errors import BusError
from aiocan.info import BusInfoType
from aiocan.plugin import BusPlugin

from caspia.toolbox import monitor

from .utils import patch

logger = logging.getLogger(__name__)


[docs]class Plugin(BusPlugin): def __init__(self, bus): super().__init__(bus) self.bus = bus self.name = os.environ.get('CSP_METRICS_ACAN_NAME') self.interval = float(os.environ.get('CSP_METRICS_ACAN_INTERVAL', '5.0')) if not self.name: return labels = dict(acan_name=self.name) # TODO: has to be refactored into instance metrics monitor.register_metric('acan:sent-bytes', 'integer', collect=True, labels=labels) monitor.register_metric('acan:sent-messages', 'integer', collect=True, labels=labels) monitor.register_metric('acan:received-bytes', 'integer', collect=True, labels=labels) monitor.register_metric('acan:received-messages', 'integer', collect=True, labels=labels) monitor.register_metric('acan:state', 'integer', labels=labels) monitor.register_metric('acan:error', 'string', labels=labels) @patch(bus) async def send(original_send, message): await original_send(message) monitor.record_metric('acan:sent-messages', 1) monitor.record_metric('acan:sent-bytes', len(message.data)) @patch(bus) async def recv(original_recv): msg = await original_recv() monitor.record_metric('acan:received-messages', 1) monitor.record_metric('acan:received-bytes', len(msg.data)) if msg.error_frame: try: error = BusError.from_message(msg) monitor.record_metric('acan:error', repr(error)) except Exception: # pylint: disable=broad-except logger.exception('failed to create error from an error-frame') return msg self.loop_task = asyncio.ensure_future(self.loop()) @patch(bus) async def shutdown(original_shutdown): await original_shutdown() self.loop_task.cancel()
[docs] async def loop(self): while True: await asyncio.sleep(self.interval) try: await self.capture_metrics() except asyncio.CancelledError: break except Exception: # pylint: disable=broad-except logger.exception('exception when capturing acan metrics')
[docs] async def capture_metrics(self): info = await self.bus.get_info() if BusInfoType.STATE in info: state_value = info[BusInfoType.STATE].value info[BusInfoType.STATE] = info[BusInfoType.STATE].name info = {k.value: v for k, v in info.items()} info['value'] = state_value else: info = {k.value: v for k, v in info.items()} if info: monitor.record_metric('acan:state', info)
[docs] def shutdown(self): if hasattr(self, 'loop_task'): self.loop_task.cancel()