import asyncio
import logging
import time
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services.presence import PresenceBase, UserState
logger = logging.getLogger(__name__)
[docs]class PresenceObservation:
def __init__(self, user, state, source, timestamp, timeout):
self.user = user
self.state = state
self.source = source
self.timestamp = timestamp
self.timeout = timeout
@property
def timedout(self):
if self.timeout is not None:
return self.timeout_timestamp <= time.time()
else:
return False
@property
def timeout_timestamp(self):
if self.timeout is not None:
return self.timestamp + self.timeout
else:
return None
[docs]class Presence(ServiceGatewayMixin, PresenceBase):
def __init__(self, *args, users=[], loop=None, **kwargs):
super().__init__(*args, **kwargs)
self._observations = {user: list() for user in users}
self._state = {user: UserState.UNKNOWN.value for user in users}
self._loop = loop or asyncio.get_event_loop()
self._update_schedule_handle = None
def _deduce_status(self, observations):
""" Deduce user presence status based on collected observations.
Returns (UserState, PresenceObservation?)
"""
for observation in observations:
if observation.state == UserState.PRESENT:
return (UserState.PRESENT, observation)
for observation in observations:
if observation.state == UserState.AWAY:
return (UserState.AWAY, observation)
return (UserState.UNKNOWN, None)
def _drop_irrelevant_observations(self):
for user in self._observations:
observations = dict()
sorted_usr = sorted(self._observations[user], key=lambda o: o.timestamp, reverse=True)
for observation in sorted_usr:
if observation.timedout or observation.source in observations:
continue
else:
observations[observation.source] = observation
self._observations[user] = list(observations.values())
async def _update(self, data=None):
data = data or dict()
if self._update_schedule_handle:
self._update_schedule_handle.cancel()
self._update_schedule_handle = None
# update observations
new_observations = [
PresenceObservation(user=u,
state=UserState(d['state']),
source=d['source'],
timestamp=time.time(),
timeout=d['timeout']) for u, d in data.items()
]
for observation in new_observations:
self._observations[observation.user].append(observation)
# drop irrelevant observations
self._drop_irrelevant_observations()
# schedule next update
self._schedule_next_update()
# calculate new state
self._state = {u: self._deduce_status(o)[0].value for u, o in self._observations.items()}
await self.notify(self.state, self._state)
def _schedule_next_update(self):
if self._update_schedule_handle:
self._update_schedule_handle.cancel()
self._update_schedule_handle = None
self._drop_irrelevant_observations()
first_timeout = None
for user in self._observations:
timeouts = (o.timeout_timestamp for o in self._observations[user]
if o.timeout_timestamp is not None)
for timeout_timestamp in timeouts:
if first_timeout is None:
first_timeout = timeout_timestamp
else:
first_timeout = min(first_timeout, timeout_timestamp)
if first_timeout is None:
return
first_timeout = max(first_timeout, time.time())
first_timeout += 0.01 # minimum testing interval
def do_update():
asyncio.ensure_future(self._update(), loop=self._loop)
logger.debug('Schedulling presence update in %.1f seconds', first_timeout - time.time())
self._update_schedule_handle = self._loop.call_later(first_timeout - time.time(), do_update)
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic is self.update:
await self._update(value)
else:
raise NotImplementedError
[docs] async def characteristic_read(self, characteristic, **kwargs):
if characteristic is self.state:
return self._state
else:
raise NotImplementedError