# pylint: disable=redefined-builtin
from ..observers import FunctionObserver
from ..value import Value
[docs]class impuls(Value):
def __init__(self, observable, map=lambda x: x, default=None):
super().__init__(None)
self._wrapped = observable
self._map = map if callable(map) else lambda _: map
self._default = default
def _became_subscribed(self):
observer = FunctionObserver()
observer.subscribe_next(self._on_next)
observer.subscribe_error(self._on_error)
self._subscription = self._wrapped.subscribe(observer)
def _became_unsubscribed(self):
self._subscription.dispose()
async def _on_next(self, value, **_):
await self.update(self._map(value))
await self.update(self._default)
async def _on_error(self, error, **_):
await self.update(error)