Source code for caspia.reactive.observable

import asyncio
import functools
import logging
from typing import Generic, TypeVar

from .disposables import AnonymousDisposable
from .observers import Observer

logger = logging.getLogger(__name__)

T = TypeVar('T')


[docs]class Observable(Generic[T]): """Represents reactive value. It is possible to subscribe for its changes using .subscribe() and get its current value using .observe() """ def __init__(self): self.observers = set()
[docs] async def trigger(self, value: T, **kwargs): """Manually trigger value change.""" loop = kwargs.get('loop', asyncio.get_event_loop()) attrname = 'on_next' if not isinstance(value, Exception) else 'on_error' targets = [getattr(o, attrname)(value, **kwargs) for o in self.observers if o.is_enabled] if len(targets): done, _ = await asyncio.wait(targets, loop=loop, timeout=kwargs.get('timeout')) for result in done: if not result.cancelled() and result.exception(): try: exception = result.exception() stack = result.get_stack() self.exception_handler(exception, stack) except Exception: # pylint: disable=broad-except logger.exception('Exception in exception_handler: %r', exception)
[docs] def dispose_subscription(self, observer): """Remove subscription of given observer.""" self.observers.remove(observer) if len(self.observers) == 0: self._became_unsubscribed()
def _became_subscribed(self): """To be overriden. Called after first subscription.""" if hasattr(self, 'become_subscribed'): getattr(self, 'become_subscribed')() def _became_unsubscribed(self): """To be overriden. Called after last subscription disposal.""" if hasattr(self, 'become_unsubscribed'): getattr(self, 'become_unsubscribed')()
[docs] async def observe(self) -> T: """Return current value of the observable.""" raise NotImplementedError
[docs] def subscribe(self, *args, **kwargs): """Create observer using given `*args` and `**kwargs` and subscribe to its changes. Returns disposable representing the subscription => call .dispose() to unsubscribe. """ observer = Observer.create(*args, **kwargs) self.observers.add(observer) if len(self.observers) == 1: self._became_subscribed() return AnonymousDisposable(functools.partial(self.dispose_subscription, observer))
def _default_exception_handler(self, exc, stack): msg = 'Exception when processing subscribed function on observable %s: %r' logger.error(msg, self, exc, exc_info=exc) exception_handler = _default_exception_handler