Source code for caspia.meadow.client.connection.consumer

# pylint: disable=too-many-locals
import asyncio
import logging
import uuid

from caspia.toolbox import monitor

from .connection import Connection
from .subscriber import (GatewaySubscriber, NotificationSubscriber, ReadResponseSubscriber,
                         RulesSubscriber, WriteResponseSubscriber, WriteSniffer)

logger = logging.getLogger(__name__)


[docs]class ConsumerConnection(Connection): def __init__(self, broker_url, name=None, loop=None): super().__init__(broker_url, name=name, loop=loop) self._gateways = dict() # gateway: services # some metrics are handled in subscribers labels = dict(connection_name=str(name)) monitor.register_metric('meadow-connection:read-ops', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:read-err', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:write-ops', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:write-err', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:notifications-in', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:request', 'integer', labels=labels)
[docs] async def subscribe_gateways_update(self, func): subscriber = GatewaySubscriber('gateway/+', func) self.attach_subscriber(subscriber) await subscriber.call_target(self._gateways) return subscriber
[docs] async def read(self, service, characteristic, timeout=None, req_id=None): req_id = req_id or str(uuid.uuid4()) start_time = self.loop.time() logger.debug('Read value for %s/%s', service, characteristic) response_topic = self._response_topic(service, characteristic, req_id) # Subscribe for response response_future = asyncio.Future() subscriber = ReadResponseSubscriber(response_topic, response_future) flag_error, flag_timeout = False, False try: self.attach_subscriber(subscriber) await self.subscriber_ready(subscriber) # Send read request request_topic = self._read_topic(service, characteristic) # pylint: disable=assignment-from-no-return read_request = self._publish(request_topic, {'req_id': req_id}) await asyncio.wait_for(read_request, timeout=timeout) monitor.record_metric('meadow-connection:read-ops', 1) # Await response value = await asyncio.wait_for(response_future, timeout=timeout) except Exception as e: flag_error, flag_timeout = True, isinstance(e, asyncio.TimeoutError) monitor.record_metric('meadow-connection:read-err', 1) raise finally: self.detach_subscriber(subscriber) monitor.record_metric( 'meadow-connection:request', { 'value': 1, 'duration': self.loop.time() - start_time, 'error': flag_error, 'timeout': flag_timeout, 'type': 'read' }) return value
[docs] async def write(self, service, characteristic, value, extra=None, timeout=None, req_id=None): start_time = self.loop.time() payload = dict(val=value, extra=extra) if extra else dict(val=value) logger.debug('Write %s to %s/%s', value, service, characteristic) req_id = req_id or str(uuid.uuid4()) payload['req_id'] = req_id # Subscribe to response response_future = asyncio.Future() response_topic = self._response_topic(service, characteristic, req_id) subscriber = WriteResponseSubscriber(response_topic, response_future) flag_error, flag_timeout = False, False try: self.attach_subscriber(subscriber) await self.subscriber_ready(subscriber) # Write await self._publish(self._write_topic(service, characteristic), payload) monitor.record_metric('meadow-connection:write-ops', 1) # Wait for response await asyncio.wait_for(response_future, timeout=timeout) except Exception as e: flag_error, flag_timeout = True, isinstance(e, asyncio.TimeoutError) monitor.record_metric('meadow-connection:write-err', 1) raise finally: self.detach_subscriber(subscriber) monitor.record_metric( 'meadow-connection:request', { 'value': 1, 'duration': self.loop.time() - start_time, 'error': flag_error, 'timeout': flag_timeout, 'type': 'write_ack' })
[docs] async def subscribe(self, service, characteristic, on_value, on_error=None): """Subscribe for ``characteristic`` notification on ``service``. :param service: Service name or None for all services. :param characteristic: Characteristic or None for all characteristics. :param on_value: callable(value=, extra=, service=, characteristic=) :param on_error: callable(error=, service=, characteristic=) or None :returns: An subscription object with .unsubscribe method. """ service = service or '+' characteristic = characteristic or '+' topic = self._notification_topic(service, characteristic) subscriber = NotificationSubscriber(topic, on_value, on_error) self.attach_subscriber(subscriber) return subscriber
[docs] def listen_writes(self, service, characteristic, target): """Listen for writes on a given characteristic. This is a way for one client to observe, that another client issued a write. This is not for responding to write commands (see ``GatewayConnection`` instead). """ topic = self._write_topic(service, characteristic or '+') logger.debug('Adding write listener for %s', topic) subscriber = WriteSniffer(topic, target) self.attach_subscriber(subscriber) return subscriber
# # Rules #
[docs] async def request_rules(self, gateway, rules): """Request activation of rules from a gateway. Use `subscribe_active_rules` to see activated rules. :param rules: iterable of serialized Rule instances """ topic = 'gateway/{}/requested_rules'.format(gateway) payload = dict(rules=list(rules)) await self._publish(topic, payload, retain=True)
[docs] async def subscribe_rules_update(self, gateway, target): """Subscribe for rule activation updates from a gateway. :param target: callable(rules: list of serialized Rule instances) :returns: an object with .unsubscribe() method. """ topic = 'gateway/{}/active_rules'.format(gateway) subscriber = RulesSubscriber(topic, target) self.attach_subscriber(subscriber) return subscriber