Source code for caspia.reactive.operations.cached

import asyncio
import logging

from ..errors import ValueNotReady
from ..observable import Observable
from ..observers import FunctionObserver

logger = logging.getLogger(__name__)


[docs]class cached(Observable): def __init__(self, observable, loop=None): super().__init__() self._loop = loop or asyncio.get_event_loop() self._observable = observable self._load = asyncio.ensure_future(self._load_initial(), loop=self._loop) observer = FunctionObserver() observer.subscribe_next(self._on_next) observer.subscribe_error(self._on_error) self._observable.subscribe(observer) @property def observable(self): return self._observable async def _load_initial(self): try: value = await self._observable.observe() except Exception as e: # pylint: disable=broad-except value = e if not hasattr(self, 'value'): if not isinstance(value, Exception): await self.on_next(value, initial=True) else: await self.on_error(value, initial=True)
[docs] async def observe(self): if hasattr(self, 'value'): if isinstance(self.value, Exception): raise self.value return self.value else: if not self._load.done(): try: await self._load return await self.observe() except Exception as e: # pylint: disable=broad-except logger.error('Failure when loading initial value: %s', repr(e)) raise raise ValueNotReady()
async def _on_next(self, value, **kwargs): if not self._load.done(): self._load.cancel() await self.on_next(value, **kwargs) async def _on_error(self, error, **kwargs): if not self._load.done(): self._load.cancel() await self.on_error(error, **kwargs)
[docs] async def on_next(self, value, **kwargs): self.value = value await self.trigger(value, **kwargs)
[docs] async def on_error(self, error, **kwargs): self.value = error await self.trigger(error, **kwargs)
def __str__(self): return '<{} {}>'.format(type(self), self.observable)