Source code for caspia.toolbox.services.presence.presence

import asyncio
import logging
import time

from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services.presence import PresenceBase, UserState

logger = logging.getLogger(__name__)


[docs]class PresenceObservation: def __init__(self, user, state, source, timestamp, timeout): self.user = user self.state = state self.source = source self.timestamp = timestamp self.timeout = timeout @property def timedout(self): if self.timeout is not None: return self.timeout_timestamp <= time.time() else: return False @property def timeout_timestamp(self): if self.timeout is not None: return self.timestamp + self.timeout else: return None
[docs]class Presence(ServiceGatewayMixin, PresenceBase): def __init__(self, *args, users=[], loop=None, **kwargs): super().__init__(*args, **kwargs) self._observations = {user: list() for user in users} self._state = {user: UserState.UNKNOWN.value for user in users} self._loop = loop or asyncio.get_event_loop() self._update_schedule_handle = None def _deduce_status(self, observations): """ Deduce user presence status based on collected observations. Returns (UserState, PresenceObservation?) """ for observation in observations: if observation.state == UserState.PRESENT: return (UserState.PRESENT, observation) for observation in observations: if observation.state == UserState.AWAY: return (UserState.AWAY, observation) return (UserState.UNKNOWN, None) def _drop_irrelevant_observations(self): for user in self._observations: observations = dict() sorted_usr = sorted(self._observations[user], key=lambda o: o.timestamp, reverse=True) for observation in sorted_usr: if observation.timedout or observation.source in observations: continue else: observations[observation.source] = observation self._observations[user] = list(observations.values()) async def _update(self, data=None): data = data or dict() if self._update_schedule_handle: self._update_schedule_handle.cancel() self._update_schedule_handle = None # update observations new_observations = [ PresenceObservation(user=u, state=UserState(d['state']), source=d['source'], timestamp=time.time(), timeout=d['timeout']) for u, d in data.items() ] for observation in new_observations: self._observations[observation.user].append(observation) # drop irrelevant observations self._drop_irrelevant_observations() # schedule next update self._schedule_next_update() # calculate new state self._state = {u: self._deduce_status(o)[0].value for u, o in self._observations.items()} await self.notify(self.state, self._state) def _schedule_next_update(self): if self._update_schedule_handle: self._update_schedule_handle.cancel() self._update_schedule_handle = None self._drop_irrelevant_observations() first_timeout = None for user in self._observations: timeouts = (o.timeout_timestamp for o in self._observations[user] if o.timeout_timestamp is not None) for timeout_timestamp in timeouts: if first_timeout is None: first_timeout = timeout_timestamp else: first_timeout = min(first_timeout, timeout_timestamp) if first_timeout is None: return first_timeout = max(first_timeout, time.time()) first_timeout += 0.01 # minimum testing interval def do_update(): asyncio.ensure_future(self._update(), loop=self._loop) logger.debug('Schedulling presence update in %.1f seconds', first_timeout - time.time()) self._update_schedule_handle = self._loop.call_later(first_timeout - time.time(), do_update)
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.update: await self._update(value) else: raise NotImplementedError
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.state: return self._state else: raise NotImplementedError