Source code for caspia.gateway.monitor.availability

import asyncio
import logging

from caspia.node import Broadcast, Listener, Node, components
from caspia.toolbox import monitor

logger = logging.getLogger(__name__)


[docs]class AvailabilityMonitor: def __init__(self, gateway_name, pollen_client, network, interval=5, timeout=60, loop=None): self._loop = loop or asyncio.get_event_loop() self._pollen_client = pollen_client self._network = network self._timeout = timeout self._interval = interval self._last_available = dict() self._last_check = None self._nodes = set() # type: set[str] self._listener = Listener(on_component_event=self._on_component_event) self._pollen_client.listen_broadcast(self._on_raw_broadcast) # register metrics labels = dict(gateway_name=gateway_name) monitor.register_metric('caspia-gateway:node-available', 'boolean', labels=labels) monitor.register_metric('caspia-gateway:nodes-available', 'integer', labels=labels) monitor.register_metric('caspia-gateway:nodes-unavailable', 'integer', labels=labels) monitor.register_metric('caspia-gateway:heartbeat-request', 'boolean', labels=labels) monitor.register_metric('caspia-gateway:node-reset', 'string', labels=labels) # start monitoring asyncio.ensure_future(self._check_availability_loop(), loop=self._loop) async def _check_availability(self): if self._last_check is None: await Node.request_heartbeats(self._pollen_client) else: time = self._loop.time() request_heartbeats = False unavailable_count = 0 for node_name in self._nodes: labels = dict(node_name=node_name) last_available = self._last_available.get(node_name) or 0 available = time - last_available < self._timeout will_be_unavailable = time + self._interval - last_available > self._timeout request_heartbeats |= not available or will_be_unavailable monitor.record_metric('caspia-gateway:node-available', available, labels=labels) if not available: unavailable_count += 1 monitor.record_metric('caspia-gateway:heartbeat-request', request_heartbeats) monitor.record_metric('caspia-gateway:nodes-available', len(self._nodes) - unavailable_count) monitor.record_metric('caspia-gateway:nodes-unavailable', unavailable_count) if request_heartbeats: await Node.request_heartbeats(self._pollen_client) logger.info('Requesting heartbeats') self._last_check = self._loop.time() async def _check_availability_loop(self): while True: try: await self._check_availability() except Exception: # pylint: disable=broad-except logger.exception('There was an exception in availability monitor') await asyncio.sleep(self._interval) @property def nodes(self): return self._nodes @nodes.setter def nodes(self, value): self._nodes = value for node in self._nodes: self._last_available[node] = None self._last_check = None def _on_raw_broadcast(self, *args, **kwargs): self._listener.process_broadcast(Broadcast(*args, **kwargs)) def _on_component_event(self, node_cid, component_id, component_type, event): name = self._network.cidmng.name_of(node_cid) if name is None or name not in self._nodes: return # log availability of the node self._last_available[name] = self._loop.time() # log reset reason (if the node just restarted) if isinstance(event, components.System.HeartbeatEvent) and event.after_restart: labels = dict(node_name=name) reason = event.reset_reason.name.lower() monitor.record_metric('caspia-gateway:node-reset', reason, labels=labels)