import asyncio
import logging
from ..errors import ValueNotReady
from ..observable import Observable
from ..observers import FunctionObserver
logger = logging.getLogger(__name__)
[docs]class cached(Observable):
def __init__(self, observable, loop=None):
super().__init__()
self._loop = loop or asyncio.get_event_loop()
self._observable = observable
self._load = asyncio.ensure_future(self._load_initial(), loop=self._loop)
observer = FunctionObserver()
observer.subscribe_next(self._on_next)
observer.subscribe_error(self._on_error)
self._observable.subscribe(observer)
@property
def observable(self):
return self._observable
async def _load_initial(self):
try:
value = await self._observable.observe()
except Exception as e: # pylint: disable=broad-except
value = e
if not hasattr(self, 'value'):
if not isinstance(value, Exception):
await self.on_next(value, initial=True)
else:
await self.on_error(value, initial=True)
[docs] async def observe(self):
if hasattr(self, 'value'):
if isinstance(self.value, Exception):
raise self.value
return self.value
else:
if not self._load.done():
try:
await self._load
return await self.observe()
except Exception as e: # pylint: disable=broad-except
logger.error('Failure when loading initial value: %s', repr(e))
raise
raise ValueNotReady()
async def _on_next(self, value, **kwargs):
if not self._load.done():
self._load.cancel()
await self.on_next(value, **kwargs)
async def _on_error(self, error, **kwargs):
if not self._load.done():
self._load.cancel()
await self.on_error(error, **kwargs)
[docs] async def on_next(self, value, **kwargs):
self.value = value
await self.trigger(value, **kwargs)
[docs] async def on_error(self, error, **kwargs):
self.value = error
await self.trigger(error, **kwargs)
def __str__(self):
return '<{} {}>'.format(type(self), self.observable)