# pylint: disable=too-many-locals
import asyncio
import logging
import uuid
from caspia.toolbox import monitor
from .connection import Connection
from .subscriber import (GatewaySubscriber, NotificationSubscriber, ReadResponseSubscriber,
RulesSubscriber, WriteResponseSubscriber, WriteSniffer)
logger = logging.getLogger(__name__)
[docs]class ConsumerConnection(Connection):
def __init__(self, broker_url, name=None, loop=None):
super().__init__(broker_url, name=name, loop=loop)
self._gateways = dict() # gateway: services
# some metrics are handled in subscribers
labels = dict(connection_name=str(name))
monitor.register_metric('meadow-connection:read-ops',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:read-err',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:write-ops',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:write-err',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:notifications-in',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:request', 'integer', labels=labels)
[docs] async def subscribe_gateways_update(self, func):
subscriber = GatewaySubscriber('gateway/+', func)
self.attach_subscriber(subscriber)
await subscriber.call_target(self._gateways)
return subscriber
[docs] async def read(self, service, characteristic, timeout=None, req_id=None):
req_id = req_id or str(uuid.uuid4())
start_time = self.loop.time()
logger.debug('Read value for %s/%s', service, characteristic)
response_topic = self._response_topic(service, characteristic, req_id)
# Subscribe for response
response_future = asyncio.Future()
subscriber = ReadResponseSubscriber(response_topic, response_future)
flag_error, flag_timeout = False, False
try:
self.attach_subscriber(subscriber)
await self.subscriber_ready(subscriber)
# Send read request
request_topic = self._read_topic(service, characteristic)
# pylint: disable=assignment-from-no-return
read_request = self._publish(request_topic, {'req_id': req_id})
await asyncio.wait_for(read_request, timeout=timeout)
monitor.record_metric('meadow-connection:read-ops', 1)
# Await response
value = await asyncio.wait_for(response_future, timeout=timeout)
except Exception as e:
flag_error, flag_timeout = True, isinstance(e, asyncio.TimeoutError)
monitor.record_metric('meadow-connection:read-err', 1)
raise
finally:
self.detach_subscriber(subscriber)
monitor.record_metric(
'meadow-connection:request', {
'value': 1,
'duration': self.loop.time() - start_time,
'error': flag_error,
'timeout': flag_timeout,
'type': 'read'
})
return value
[docs] async def write(self, service, characteristic, value, extra=None, timeout=None, req_id=None):
start_time = self.loop.time()
payload = dict(val=value, extra=extra) if extra else dict(val=value)
logger.debug('Write %s to %s/%s', value, service, characteristic)
req_id = req_id or str(uuid.uuid4())
payload['req_id'] = req_id
# Subscribe to response
response_future = asyncio.Future()
response_topic = self._response_topic(service, characteristic, req_id)
subscriber = WriteResponseSubscriber(response_topic, response_future)
flag_error, flag_timeout = False, False
try:
self.attach_subscriber(subscriber)
await self.subscriber_ready(subscriber)
# Write
await self._publish(self._write_topic(service, characteristic), payload)
monitor.record_metric('meadow-connection:write-ops', 1)
# Wait for response
await asyncio.wait_for(response_future, timeout=timeout)
except Exception as e:
flag_error, flag_timeout = True, isinstance(e, asyncio.TimeoutError)
monitor.record_metric('meadow-connection:write-err', 1)
raise
finally:
self.detach_subscriber(subscriber)
monitor.record_metric(
'meadow-connection:request', {
'value': 1,
'duration': self.loop.time() - start_time,
'error': flag_error,
'timeout': flag_timeout,
'type': 'write_ack'
})
[docs] async def subscribe(self, service, characteristic, on_value, on_error=None):
"""Subscribe for ``characteristic`` notification on ``service``.
:param service: Service name or None for all services.
:param characteristic: Characteristic or None for all characteristics.
:param on_value: callable(value=, extra=, service=, characteristic=)
:param on_error: callable(error=, service=, characteristic=) or None
:returns: An subscription object with .unsubscribe method.
"""
service = service or '+'
characteristic = characteristic or '+'
topic = self._notification_topic(service, characteristic)
subscriber = NotificationSubscriber(topic, on_value, on_error)
self.attach_subscriber(subscriber)
return subscriber
[docs] def listen_writes(self, service, characteristic, target):
"""Listen for writes on a given characteristic.
This is a way for one client to observe, that another client issued a write.
This is not for responding to write commands (see ``GatewayConnection`` instead).
"""
topic = self._write_topic(service, characteristic or '+')
logger.debug('Adding write listener for %s', topic)
subscriber = WriteSniffer(topic, target)
self.attach_subscriber(subscriber)
return subscriber
#
# Rules
#
[docs] async def request_rules(self, gateway, rules):
"""Request activation of rules from a gateway.
Use `subscribe_active_rules` to see activated rules.
:param rules: iterable of serialized Rule instances
"""
topic = 'gateway/{}/requested_rules'.format(gateway)
payload = dict(rules=list(rules))
await self._publish(topic, payload, retain=True)
[docs] async def subscribe_rules_update(self, gateway, target):
"""Subscribe for rule activation updates from a gateway.
:param target: callable(rules: list of serialized Rule instances)
:returns: an object with .unsubscribe() method.
"""
topic = 'gateway/{}/active_rules'.format(gateway)
subscriber = RulesSubscriber(topic, target)
self.attach_subscriber(subscriber)
return subscriber