Source code for caspia.meadow.client.connection.subscriber

# pylint: disable=protected-access
import asyncio
import logging

from caspia.meadow import errors, utils
from caspia.toolbox import monitor

logger = logging.getLogger(__name__)


[docs]class BaseSubscriber: """Meadow's topic subscriber.""" def __init__(self, topic, target): self.topic = topic self.target = target self.client = None
[docs] def unsubscribe(self): if self.client: self.client.detach_subscriber(self)
[docs] async def call_target(self, *args, **kwargs): return await utils.acall(self.target, *args, **kwargs)
[docs] def is_future(self): return isinstance(self.target, asyncio.Future)
[docs] def is_coroutine(self): return asyncio.iscoroutinefunction(self.target)
def _extract_value(self, payload): return payload.get('val') def _extract_req_id(self, payload): return payload.get('req_id') def _extract_error(self, payload): return errors.MeadowError.from_dict(payload) def _extract_extra(self, payload): return payload.get('extra', {})
[docs]class AnonymousSubscriber(BaseSubscriber): async def __call__(self, payload, topic): return await self.call_target(payload)
[docs]class ReadResponseSubscriber(BaseSubscriber): async def __call__(self, payload, topic): if not self.is_future: raise TypeError('target in ReadResponseSubscriber must be a Future') logger.debug('Got read response %s %s', self.topic, payload) if 'val' not in payload: error = self._extract_error(payload) self.target.set_exception(error) else: val = self._extract_value(payload) extra = self._extract_extra(payload) self.target.set_result((val, extra))
[docs]class WriteResponseSubscriber(BaseSubscriber): async def __call__(self, payload, topic): if not self.is_future: raise TypeError('target in WriteResponseSubscriber must be a Future') logger.debug('Got write response %s %s', self.topic, payload) if not payload: self.target.set_result(None) else: error = self._extract_error(payload) self.target.set_exception(error)
[docs]class NotificationSubscriber(BaseSubscriber): def __init__(self, topic, on_value, on_error): super().__init__(topic, on_value) self.on_value = on_value self.on_error = on_error async def __call__(self, payload, topic): _, service, characteristic = topic.split('/') kwargs = dict(service=service, characteristic=characteristic) if 'val' in payload and self.on_value: val = self._extract_value(payload) extra = self._extract_extra(payload) self.target = self.on_value await self.call_target(value=val, extra=extra, **kwargs) elif 'val' not in payload and self.on_error: error = self._extract_error(payload) self.target = self.on_error await self.call_target(error=error, **kwargs)
[docs]class ReadSubscriber(BaseSubscriber): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.allow_dolar_characteristics = False async def __call__(self, request, topic): _, service, char = topic.split('/') if char.startswith('$') and not self.allow_dolar_characteristics: return kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {} try: retval = await self.call_target(**kwargs) if isinstance(retval, tuple): value, extra = retval payload = dict(val=value, extra=extra) else: payload = dict(val=retval) except Exception as exc: # pylint: disable=broad-except payload = errors.MeadowError.from_exception(exc).to_dict() req_id = self._extract_req_id(request) response_topic = "/".join(['response', service, char, req_id]) await self.client._publish(response_topic, payload) monitor.record_metric('meadow-connection:handle-read-ops', 1) if 'val' not in payload: monitor.record_metric('meadow-connection:handle-read-err', 1)
[docs]class WriteSubscriber(BaseSubscriber): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.allow_dolar_characteristics = False async def __call__(self, request, topic): _method, service, char = topic.split('/') if char.startswith('$') and not self.allow_dolar_characteristics: return kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {} req_id = self._extract_req_id(request) val = self._extract_value(request) extra = self._extract_extra(request) try: await self.call_target(val, extra=extra, **kwargs) payload = {} except Exception as exc: # pylint: disable=broad-except payload = errors.MeadowError.from_exception(exc).to_dict() response_topic = '/'.join(['response', service, char, req_id]) await self.client._publish(response_topic, payload) monitor.record_metric('meadow-connection:handle-write-ops', 1) if payload: monitor.record_metric('meadow-connection:handle-write-err', 1)
[docs]class WriteSniffer(BaseSubscriber): async def __call__(self, request, topic): _method, service, char = topic.split('/') kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {} val = self._extract_value(request) extra = self._extract_extra(request) await self.call_target(val, extra=extra, **kwargs)
[docs]class RulesSubscriber(BaseSubscriber): async def __call__(self, payload, topic): await self.call_target(payload['rules'])
[docs]class GatewaySubscriber(BaseSubscriber): async def __call__(self, payload, topic): _, gateway = topic.split('/') self.client._gateways[gateway] = payload await self.call_target(self.client._gateways)