Source code for caspia.reactive.operations.lowpass

import asyncio
import statistics
from collections import namedtuple

from ..errors import ValueNotReady
from ..observable import Observable
from ..observers import FunctionObserver

Sample = namedtuple('Sample', ['time', 'value'])


[docs]class lowpass(Observable): def __init__(self, input_: Observable, window=60, algorithm='mean', loop=None): super().__init__() self.loop = loop or asyncio.get_event_loop() self.input_ = input_ self.samples = [] # type: Iterable[Sample] self.window = window self.algorithm = algorithm observer = FunctionObserver() observer.subscribe_next(self._on_next) observer.subscribe_error(self._on_error) self.input_.subscribe(observer) self.error = None def _remove_old_samples(self): current_time = self.current_time self.samples = [s for s in self.samples if current_time - s.time <= self.window] def _current_output(self): if self.samples: if self.algorithm == 'mean': return statistics.mean(s.value for s in self.samples) elif self.algorithm == 'median': return statistics.median(s.value for s in self.samples) else: raise NotImplementedError else: raise ValueNotReady('No sample available for current window') @property def current_time(self): return self.loop.time() async def _on_next(self, value, **kwargs): self.error = None current_time = self.current_time sample = Sample(current_time, value) self.samples.append(sample) self._remove_old_samples() out = self._current_output() await self.trigger(out) async def _on_error(self, error, **kwargs): self.error = error self.samples = [] await self.trigger(error)
[docs] async def observe(self): if self.error is None: self._remove_old_samples() return self._current_output() else: raise self.error # pylint: disable=raising-bad-type