Source code for caspia.reactive.operations.filter

# pylint: disable=redefined-builtin
from .cached import cached


[docs]class filter(cached): def __init__(self, lambda_, observable, mode='previous', **kwargs): super().__init__(observable, **kwargs) self._filter = lambda_ self._mode = mode
[docs] async def on_next(self, value, **kwargs): if self._filter(value): await super().on_next(value, **kwargs) elif self._mode == 'none' and getattr(self, 'value', 'nonnone') is not None: await super().on_next(None, **kwargs)
def __str__(self): return '<filter of {}>'.format(self.observable)