# pylint: disable=unused-variable
import asyncio
import logging
import os
from aiocan.errors import BusError
from aiocan.info import BusInfoType
from aiocan.plugin import BusPlugin
from caspia.toolbox import monitor
from .utils import patch
logger = logging.getLogger(__name__)
[docs]class Plugin(BusPlugin):
def __init__(self, bus):
super().__init__(bus)
self.bus = bus
self.name = os.environ.get('CSP_METRICS_ACAN_NAME')
self.interval = float(os.environ.get('CSP_METRICS_ACAN_INTERVAL', '5.0'))
if not self.name:
return
labels = dict(acan_name=self.name)
# TODO: has to be refactored into instance metrics
monitor.register_metric('acan:sent-bytes', 'integer', collect=True, labels=labels)
monitor.register_metric('acan:sent-messages', 'integer', collect=True, labels=labels)
monitor.register_metric('acan:received-bytes', 'integer', collect=True, labels=labels)
monitor.register_metric('acan:received-messages', 'integer', collect=True, labels=labels)
monitor.register_metric('acan:state', 'integer', labels=labels)
monitor.register_metric('acan:error', 'string', labels=labels)
@patch(bus)
async def send(original_send, message):
await original_send(message)
monitor.record_metric('acan:sent-messages', 1)
monitor.record_metric('acan:sent-bytes', len(message.data))
@patch(bus)
async def recv(original_recv):
msg = await original_recv()
monitor.record_metric('acan:received-messages', 1)
monitor.record_metric('acan:received-bytes', len(msg.data))
if msg.error_frame:
try:
error = BusError.from_message(msg)
monitor.record_metric('acan:error', repr(error))
except Exception: # pylint: disable=broad-except
logger.exception('failed to create error from an error-frame')
return msg
self.loop_task = asyncio.ensure_future(self.loop())
@patch(bus)
async def shutdown(original_shutdown):
await original_shutdown()
self.loop_task.cancel()
[docs] async def loop(self):
while True:
await asyncio.sleep(self.interval)
try:
await self.capture_metrics()
except asyncio.CancelledError:
break
except Exception: # pylint: disable=broad-except
logger.exception('exception when capturing acan metrics')
[docs] async def capture_metrics(self):
info = await self.bus.get_info()
if BusInfoType.STATE in info:
state_value = info[BusInfoType.STATE].value
info[BusInfoType.STATE] = info[BusInfoType.STATE].name
info = {k.value: v for k, v in info.items()}
info['value'] = state_value
else:
info = {k.value: v for k, v in info.items()}
if info:
monitor.record_metric('acan:state', info)
[docs] def shutdown(self):
if hasattr(self, 'loop_task'):
self.loop_task.cancel()