Source code for caspia.reactive.operations.impuls

# pylint: disable=redefined-builtin
from ..observers import FunctionObserver
from ..value import Value


[docs]class impuls(Value): def __init__(self, observable, map=lambda x: x, default=None): super().__init__(None) self._wrapped = observable self._map = map if callable(map) else lambda _: map self._default = default def _became_subscribed(self): observer = FunctionObserver() observer.subscribe_next(self._on_next) observer.subscribe_error(self._on_error) self._subscription = self._wrapped.subscribe(observer) def _became_unsubscribed(self): self._subscription.dispose() async def _on_next(self, value, **_): await self.update(self._map(value)) await self.update(self._default) async def _on_error(self, error, **_): await self.update(error)