import asyncio
import statistics
from collections import namedtuple
from ..errors import ValueNotReady
from ..observable import Observable
from ..observers import FunctionObserver
Sample = namedtuple('Sample', ['time', 'value'])
[docs]class lowpass(Observable):
def __init__(self, input_: Observable, window=60, algorithm='mean', loop=None):
super().__init__()
self.loop = loop or asyncio.get_event_loop()
self.input_ = input_
self.samples = [] # type: Iterable[Sample]
self.window = window
self.algorithm = algorithm
observer = FunctionObserver()
observer.subscribe_next(self._on_next)
observer.subscribe_error(self._on_error)
self.input_.subscribe(observer)
self.error = None
def _remove_old_samples(self):
current_time = self.current_time
self.samples = [s for s in self.samples if current_time - s.time <= self.window]
def _current_output(self):
if self.samples:
if self.algorithm == 'mean':
return statistics.mean(s.value for s in self.samples)
elif self.algorithm == 'median':
return statistics.median(s.value for s in self.samples)
else:
raise NotImplementedError
else:
raise ValueNotReady('No sample available for current window')
@property
def current_time(self):
return self.loop.time()
async def _on_next(self, value, **kwargs):
self.error = None
current_time = self.current_time
sample = Sample(current_time, value)
self.samples.append(sample)
self._remove_old_samples()
out = self._current_output()
await self.trigger(out)
async def _on_error(self, error, **kwargs):
self.error = error
self.samples = []
await self.trigger(error)
[docs] async def observe(self):
if self.error is None:
self._remove_old_samples()
return self._current_output()
else:
raise self.error # pylint: disable=raising-bad-type