Source code for caspia.meadow.testing.connection

import asyncio
from collections import defaultdict
from typing import Dict

from caspia import reactive as r
from caspia.meadow.client import ConsumerConnection
from caspia.meadow.client.connection.subscriber import NotificationSubscriber


[docs]class MockConsumerConnection(ConsumerConnection): def __init__(self): # pylint: disable=super-init-not-called self.mocks: Dict[str, r.Observable] = dict() self.loop = asyncio.get_event_loop() self._subscriptions = defaultdict(set) self._gateways = r.Value(dict()) @property def connected(self): return True def _mock_key(self, service, characteristic): return f"{service}:{characteristic}"
[docs] async def attach_subscriber(self, subscriber): pass
[docs] async def mock_gateway_register(self, gateways): await self._gateways.update(gateways) return await self.mock_gateways(gateways)
[docs] async def mock_gateways(self, gateways): for subscription in self._subscriptions['gateway/+']: await subscription.call_target(gateways)
[docs] async def read(self, service, characteristic, timeout=None, req_id=None): key = self._mock_key(service, characteristic) retval = await self.mocks[key].observe() if getattr(self.mocks[key], 'with_extra', False): return retval else: return (retval, dict())
[docs] async def write(self, service, characteristic, value, extra=None, timeout=None, req_id=None): key = self._mock_key(service, characteristic) if getattr(self.mocks[key], 'with_extra', False): await self.mocks[key].update((value, extra)) else: await self.mocks[key].update(value)
[docs] async def subscribe(self, service, characteristic, on_value, on_error=None): topic = self._notification_topic(service, characteristic) subscription = NotificationSubscriber(topic, on_value, on_error) async def on_update(value): print(value) await subscription({'val': value}, topic) subscription.unsubscribe = self[service, characteristic].subscribe(on_update).dispose return subscription
[docs] async def subscribe_gateways_update(self, target): gateways = await self._gateways.observe() await target(gateways) return self._gateways.subscribe(target)
def __getitem__(self, spec): service, characteristic = spec key = self._mock_key(service, characteristic) return self.mocks[key] def __setitem__(self, spec, value): service, characteristic = spec key = self._mock_key(service, characteristic) self.mocks[key] = value