import asyncio
import functools
import logging
from typing import Generic, TypeVar
from .disposables import AnonymousDisposable
from .observers import Observer
logger = logging.getLogger(__name__)
T = TypeVar('T')
[docs]class Observable(Generic[T]):
"""Represents reactive value.
It is possible to subscribe for its changes using .subscribe()
and get its current value using .observe()
"""
def __init__(self):
self.observers = set()
[docs] async def trigger(self, value: T, **kwargs):
"""Manually trigger value change."""
loop = kwargs.get('loop', asyncio.get_event_loop())
attrname = 'on_next' if not isinstance(value, Exception) else 'on_error'
targets = [getattr(o, attrname)(value, **kwargs) for o in self.observers if o.is_enabled]
if len(targets):
done, _ = await asyncio.wait(targets, loop=loop, timeout=kwargs.get('timeout'))
for result in done:
if not result.cancelled() and result.exception():
try:
exception = result.exception()
stack = result.get_stack()
self.exception_handler(exception, stack)
except Exception: # pylint: disable=broad-except
logger.exception('Exception in exception_handler: %r', exception)
[docs] def dispose_subscription(self, observer):
"""Remove subscription of given observer."""
self.observers.remove(observer)
if len(self.observers) == 0:
self._became_unsubscribed()
def _became_subscribed(self):
"""To be overriden. Called after first subscription."""
if hasattr(self, 'become_subscribed'):
getattr(self, 'become_subscribed')()
def _became_unsubscribed(self):
"""To be overriden. Called after last subscription disposal."""
if hasattr(self, 'become_unsubscribed'):
getattr(self, 'become_unsubscribed')()
[docs] async def observe(self) -> T:
"""Return current value of the observable."""
raise NotImplementedError
[docs] def subscribe(self, *args, **kwargs):
"""Create observer using given `*args` and `**kwargs` and subscribe to its changes.
Returns disposable representing the subscription => call .dispose() to unsubscribe.
"""
observer = Observer.create(*args, **kwargs)
self.observers.add(observer)
if len(self.observers) == 1:
self._became_subscribed()
return AnonymousDisposable(functools.partial(self.dispose_subscription, observer))
def _default_exception_handler(self, exc, stack):
msg = 'Exception when processing subscribed function on observable %s: %r'
logger.error(msg, self, exc, exc_info=exc)
exception_handler = _default_exception_handler