# pylint: disable=protected-access
import asyncio
import logging
from caspia.meadow import errors, utils
from caspia.toolbox import monitor
logger = logging.getLogger(__name__)
[docs]class BaseSubscriber:
"""Meadow's topic subscriber."""
def __init__(self, topic, target):
self.topic = topic
self.target = target
self.client = None
[docs] def unsubscribe(self):
if self.client:
self.client.detach_subscriber(self)
[docs] async def call_target(self, *args, **kwargs):
return await utils.acall(self.target, *args, **kwargs)
[docs] def is_future(self):
return isinstance(self.target, asyncio.Future)
[docs] def is_coroutine(self):
return asyncio.iscoroutinefunction(self.target)
def _extract_value(self, payload):
return payload.get('val')
def _extract_req_id(self, payload):
return payload.get('req_id')
def _extract_error(self, payload):
return errors.MeadowError.from_dict(payload)
def _extract_extra(self, payload):
return payload.get('extra', {})
[docs]class AnonymousSubscriber(BaseSubscriber):
async def __call__(self, payload, topic):
return await self.call_target(payload)
[docs]class ReadResponseSubscriber(BaseSubscriber):
async def __call__(self, payload, topic):
if not self.is_future:
raise TypeError('target in ReadResponseSubscriber must be a Future')
logger.debug('Got read response %s %s', self.topic, payload)
if 'val' not in payload:
error = self._extract_error(payload)
self.target.set_exception(error)
else:
val = self._extract_value(payload)
extra = self._extract_extra(payload)
self.target.set_result((val, extra))
[docs]class WriteResponseSubscriber(BaseSubscriber):
async def __call__(self, payload, topic):
if not self.is_future:
raise TypeError('target in WriteResponseSubscriber must be a Future')
logger.debug('Got write response %s %s', self.topic, payload)
if not payload:
self.target.set_result(None)
else:
error = self._extract_error(payload)
self.target.set_exception(error)
[docs]class NotificationSubscriber(BaseSubscriber):
def __init__(self, topic, on_value, on_error):
super().__init__(topic, on_value)
self.on_value = on_value
self.on_error = on_error
async def __call__(self, payload, topic):
_, service, characteristic = topic.split('/')
kwargs = dict(service=service, characteristic=characteristic)
if 'val' in payload and self.on_value:
val = self._extract_value(payload)
extra = self._extract_extra(payload)
self.target = self.on_value
await self.call_target(value=val, extra=extra, **kwargs)
elif 'val' not in payload and self.on_error:
error = self._extract_error(payload)
self.target = self.on_error
await self.call_target(error=error, **kwargs)
[docs]class ReadSubscriber(BaseSubscriber):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.allow_dolar_characteristics = False
async def __call__(self, request, topic):
_, service, char = topic.split('/')
if char.startswith('$') and not self.allow_dolar_characteristics:
return
kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {}
try:
retval = await self.call_target(**kwargs)
if isinstance(retval, tuple):
value, extra = retval
payload = dict(val=value, extra=extra)
else:
payload = dict(val=retval)
except Exception as exc: # pylint: disable=broad-except
payload = errors.MeadowError.from_exception(exc).to_dict()
req_id = self._extract_req_id(request)
response_topic = "/".join(['response', service, char, req_id])
await self.client._publish(response_topic, payload)
monitor.record_metric('meadow-connection:handle-read-ops', 1)
if 'val' not in payload:
monitor.record_metric('meadow-connection:handle-read-err', 1)
[docs]class WriteSubscriber(BaseSubscriber):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.allow_dolar_characteristics = False
async def __call__(self, request, topic):
_method, service, char = topic.split('/')
if char.startswith('$') and not self.allow_dolar_characteristics:
return
kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {}
req_id = self._extract_req_id(request)
val = self._extract_value(request)
extra = self._extract_extra(request)
try:
await self.call_target(val, extra=extra, **kwargs)
payload = {}
except Exception as exc: # pylint: disable=broad-except
payload = errors.MeadowError.from_exception(exc).to_dict()
response_topic = '/'.join(['response', service, char, req_id])
await self.client._publish(response_topic, payload)
monitor.record_metric('meadow-connection:handle-write-ops', 1)
if payload:
monitor.record_metric('meadow-connection:handle-write-err', 1)
[docs]class WriteSniffer(BaseSubscriber):
async def __call__(self, request, topic):
_method, service, char = topic.split('/')
kwargs = dict(service=service, characteristic=char) if '+' in self.topic else {}
val = self._extract_value(request)
extra = self._extract_extra(request)
await self.call_target(val, extra=extra, **kwargs)
[docs]class RulesSubscriber(BaseSubscriber):
async def __call__(self, payload, topic):
await self.call_target(payload['rules'])
[docs]class GatewaySubscriber(BaseSubscriber):
async def __call__(self, payload, topic):
_, gateway = topic.split('/')
self.client._gateways[gateway] = payload
await self.call_target(self.client._gateways)