import abc
import inspect
from typing import Generic, TypeVar
T = TypeVar('T')
[docs]class Observer(Generic[T], metaclass=abc.ABCMeta):
creators = []
[docs] def enable(self):
self.enabled = True
[docs] def disable(self):
self.enabled = False
@property
def is_enabled(self):
return getattr(self, 'enabled', True)
[docs] @staticmethod
def create(*args, **kwargs):
for creator in Observer.creators:
observer = creator(*args, **kwargs)
if observer is not None:
return observer
raise ValueError('Invalid observer specification')
[docs] @staticmethod
def register_creator(creator):
Observer.creators.insert(0, creator)
return creator
[docs] @abc.abstractmethod
async def on_next(self, value: T, **kwargs):
return NotImplemented
[docs] @abc.abstractmethod
async def on_error(self, error, **kwargs):
return NotImplemented
[docs]class FunctionObserver(Observer):
[docs] @staticmethod
@Observer.register_creator
def creator(*args, **kwargs):
if len(args) == 1 and isinstance(args[0], FunctionObserver):
return args[0]
elif len(args) and callable(args[0]):
observer = FunctionObserver()
observer.subscribe_next(args[0], *args[1:], **kwargs)
return observer
return None
def __init__(self):
super().__init__()
self.next = None
self.error = None
[docs] def subscribe_next(self, on_next):
self.next = on_next
self.next_params = self._analyze_func(on_next)
[docs] def subscribe_error(self, on_error):
self.error = on_error
self.error_params = self._analyze_func(on_error)
[docs] async def on_next(self, value, **kwargs):
if not self.next:
return
pass_value, pass_kwargs = self.next_params
args = (value, ) if pass_value else ()
kwargs = kwargs if pass_kwargs else {}
retval = self.next(*args, **kwargs)
if inspect.isawaitable(retval):
await retval
[docs] async def on_error(self, error, **kwargs):
if not self.error:
return
pass_error, pass_kwargs = self.error_params
args = (error, ) if pass_error else ()
kwargs = kwargs if pass_kwargs else {}
retval = self.error(*args, **kwargs)
if inspect.isawaitable(retval):
await retval
def _analyze_func(self, func):
""" Returns tuple (pass_value, pass_kwargs) """
sig = inspect.signature(func)
return len(sig.parameters) >= 1, len(sig.parameters) >= 2
def __str__(self):
func = self.next or self.error or None
if func is None:
return '<no function>'
else:
sig = inspect.signature(func)
module = inspect.getmodule(func)
prefix = f'{module.__name__}.' if module else ''
name = getattr(func, '__name__', 'unknown')
return f'{prefix}{name}{sig}'