# pylint: disable=redefined-builtin
import inspect
import logging
from functools import partial
from ..observable import Observable
from ..observers import FunctionObserver
logger = logging.getLogger(__name__)
[docs]class lambda_(Observable):
def __init__(self, lam, *args, desc=None):
super().__init__()
self._lambda = lam
self._args = list(args)
self._subscriptions = []
if desc:
self._desc = desc
else:
self._desc = 'lambda('
self._desc += ', '.join('{}' for arg in self._args)
self._desc += ')'
async def _evaluate(self, known_values=None):
known_values = known_values or dict()
values = []
for idx in range(len(self._args)):
if idx in known_values:
values.append(known_values[idx])
else:
values.append(await self._args[idx].observe())
result = self._lambda(*values)
if inspect.isawaitable(result):
return await result
else:
return result
async def _on_next_value(self, value, arg_idx, **_):
try:
new_value = await self._evaluate(known_values={arg_idx: value})
except Exception as e: # pylint: disable=broad-except
new_value = e
if not hasattr(self, '_last_value') or self._last_value != new_value:
await self.trigger(new_value)
self._last_value = new_value
async def _on_error_value(self, error, **_):
if not hasattr(self, '_last_value') or self._last_value != error:
await self.trigger(value=error)
self._last_value = error
_on_next_value.error = _on_error_value
[docs] async def observe(self):
return await self._evaluate()
def _became_subscribed(self):
self._subscriptions = []
for idx, arg in enumerate(self._args):
observer = FunctionObserver()
observer.subscribe_next(partial(self._on_next_value, arg_idx=idx))
observer.subscribe_error(self._on_error_value)
subscription = arg.subscribe(observer)
self._subscriptions.append(subscription)
def _became_unsubscribed(self):
for sub in self._subscriptions:
sub.dispose()
self._subscriptions = []
def __str__(self):
return '<' + self._desc.format(*self._args) + '>'
def __repr__(self):
return str(self)