Source code for caspia.reactive.operations.lambda_

# pylint: disable=redefined-builtin
import inspect
import logging
from functools import partial

from ..observable import Observable
from ..observers import FunctionObserver

logger = logging.getLogger(__name__)


[docs]class lambda_(Observable): def __init__(self, lam, *args, desc=None): super().__init__() self._lambda = lam self._args = list(args) self._subscriptions = [] if desc: self._desc = desc else: self._desc = 'lambda(' self._desc += ', '.join('{}' for arg in self._args) self._desc += ')' async def _evaluate(self, known_values=None): known_values = known_values or dict() values = [] for idx in range(len(self._args)): if idx in known_values: values.append(known_values[idx]) else: values.append(await self._args[idx].observe()) result = self._lambda(*values) if inspect.isawaitable(result): return await result else: return result async def _on_next_value(self, value, arg_idx, **_): try: new_value = await self._evaluate(known_values={arg_idx: value}) except Exception as e: # pylint: disable=broad-except new_value = e if not hasattr(self, '_last_value') or self._last_value != new_value: await self.trigger(new_value) self._last_value = new_value async def _on_error_value(self, error, **_): if not hasattr(self, '_last_value') or self._last_value != error: await self.trigger(value=error) self._last_value = error _on_next_value.error = _on_error_value
[docs] async def observe(self): return await self._evaluate()
def _became_subscribed(self): self._subscriptions = [] for idx, arg in enumerate(self._args): observer = FunctionObserver() observer.subscribe_next(partial(self._on_next_value, arg_idx=idx)) observer.subscribe_error(self._on_error_value) subscription = arg.subscribe(observer) self._subscriptions.append(subscription) def _became_unsubscribed(self): for sub in self._subscriptions: sub.dispose() self._subscriptions = [] def __str__(self): return '<' + self._desc.format(*self._args) + '>' def __repr__(self): return str(self)