Source code for caspia.reactive.observers

import abc
import inspect
from typing import Generic, TypeVar

T = TypeVar('T')


[docs]class Observer(Generic[T], metaclass=abc.ABCMeta): creators = []
[docs] def enable(self): self.enabled = True
[docs] def disable(self): self.enabled = False
@property def is_enabled(self): return getattr(self, 'enabled', True)
[docs] @staticmethod def create(*args, **kwargs): for creator in Observer.creators: observer = creator(*args, **kwargs) if observer is not None: return observer raise ValueError('Invalid observer specification')
[docs] @staticmethod def register_creator(creator): Observer.creators.insert(0, creator) return creator
[docs] @abc.abstractmethod async def on_next(self, value: T, **kwargs): return NotImplemented
[docs] @abc.abstractmethod async def on_error(self, error, **kwargs): return NotImplemented
[docs]class FunctionObserver(Observer):
[docs] @staticmethod @Observer.register_creator def creator(*args, **kwargs): if len(args) == 1 and isinstance(args[0], FunctionObserver): return args[0] elif len(args) and callable(args[0]): observer = FunctionObserver() observer.subscribe_next(args[0], *args[1:], **kwargs) return observer return None
def __init__(self): super().__init__() self.next = None self.error = None
[docs] def subscribe_next(self, on_next): self.next = on_next self.next_params = self._analyze_func(on_next)
[docs] def subscribe_error(self, on_error): self.error = on_error self.error_params = self._analyze_func(on_error)
[docs] async def on_next(self, value, **kwargs): if not self.next: return pass_value, pass_kwargs = self.next_params args = (value, ) if pass_value else () kwargs = kwargs if pass_kwargs else {} retval = self.next(*args, **kwargs) if inspect.isawaitable(retval): await retval
[docs] async def on_error(self, error, **kwargs): if not self.error: return pass_error, pass_kwargs = self.error_params args = (error, ) if pass_error else () kwargs = kwargs if pass_kwargs else {} retval = self.error(*args, **kwargs) if inspect.isawaitable(retval): await retval
def _analyze_func(self, func): """ Returns tuple (pass_value, pass_kwargs) """ sig = inspect.signature(func) return len(sig.parameters) >= 1, len(sig.parameters) >= 2 def __str__(self): func = self.next or self.error or None if func is None: return '<no function>' else: sig = inspect.signature(func) module = inspect.getmodule(func) prefix = f'{module.__name__}.' if module else '' name = getattr(func, '__name__', 'unknown') return f'{prefix}{name}{sig}'