import asyncio
import logging
from caspia.node import Broadcast, Listener, Node, components
from caspia.toolbox import monitor
logger = logging.getLogger(__name__)
[docs]class AvailabilityMonitor:
def __init__(self, gateway_name, pollen_client, network, interval=5, timeout=60, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._pollen_client = pollen_client
self._network = network
self._timeout = timeout
self._interval = interval
self._last_available = dict()
self._last_check = None
self._nodes = set() # type: set[str]
self._listener = Listener(on_component_event=self._on_component_event)
self._pollen_client.listen_broadcast(self._on_raw_broadcast)
# register metrics
labels = dict(gateway_name=gateway_name)
monitor.register_metric('caspia-gateway:node-available', 'boolean', labels=labels)
monitor.register_metric('caspia-gateway:nodes-available', 'integer', labels=labels)
monitor.register_metric('caspia-gateway:nodes-unavailable', 'integer', labels=labels)
monitor.register_metric('caspia-gateway:heartbeat-request', 'boolean', labels=labels)
monitor.register_metric('caspia-gateway:node-reset', 'string', labels=labels)
# start monitoring
asyncio.ensure_future(self._check_availability_loop(), loop=self._loop)
async def _check_availability(self):
if self._last_check is None:
await Node.request_heartbeats(self._pollen_client)
else:
time = self._loop.time()
request_heartbeats = False
unavailable_count = 0
for node_name in self._nodes:
labels = dict(node_name=node_name)
last_available = self._last_available.get(node_name) or 0
available = time - last_available < self._timeout
will_be_unavailable = time + self._interval - last_available > self._timeout
request_heartbeats |= not available or will_be_unavailable
monitor.record_metric('caspia-gateway:node-available', available, labels=labels)
if not available:
unavailable_count += 1
monitor.record_metric('caspia-gateway:heartbeat-request', request_heartbeats)
monitor.record_metric('caspia-gateway:nodes-available',
len(self._nodes) - unavailable_count)
monitor.record_metric('caspia-gateway:nodes-unavailable', unavailable_count)
if request_heartbeats:
await Node.request_heartbeats(self._pollen_client)
logger.info('Requesting heartbeats')
self._last_check = self._loop.time()
async def _check_availability_loop(self):
while True:
try:
await self._check_availability()
except Exception: # pylint: disable=broad-except
logger.exception('There was an exception in availability monitor')
await asyncio.sleep(self._interval)
@property
def nodes(self):
return self._nodes
@nodes.setter
def nodes(self, value):
self._nodes = value
for node in self._nodes:
self._last_available[node] = None
self._last_check = None
def _on_raw_broadcast(self, *args, **kwargs):
self._listener.process_broadcast(Broadcast(*args, **kwargs))
def _on_component_event(self, node_cid, component_id, component_type, event):
name = self._network.cidmng.name_of(node_cid)
if name is None or name not in self._nodes:
return
# log availability of the node
self._last_available[name] = self._loop.time()
# log reset reason (if the node just restarted)
if isinstance(event, components.System.HeartbeatEvent) and event.after_restart:
labels = dict(node_name=name)
reason = event.reset_reason.name.lower()
monitor.record_metric('caspia-gateway:node-reset', reason, labels=labels)