Source code for caspia.pan.reactive.time

import asyncio
import time as timemod

import arrow

from caspia import reactive


[docs]def parse_time(tm): if isinstance(tm, tuple): return tm else: h, m = tm.split(':') return int(h), int(m)
[docs]def get_time_observable(tm): if isinstance(tm, reactive.Observable): return tm else: return reactive.Value(parse_time(tm))
[docs]class TimeObservable(reactive.Observable): def __init__(self): super().__init__() self.update_task = None self.time = None
[docs] def map_time(self, tm: arrow.Arrow): return (tm.hour, tm.minute)
[docs] def update_delay(self, now): return (60 - now.second) + 1
[docs] def become_subscribed(self): self.update_current_time() self.start_updates()
[docs] def become_unsubscribed(self): self.stop_updates()
[docs] def start_updates(self): if getattr(self, 'mocked', False): return if self.update_task: self.update_task.cancel() self.update_task = None self.update_task = asyncio.ensure_future(self.update_periodically())
[docs] async def update_periodically(self): try: while True: now = arrow.now() delay = self.update_delay(now) await asyncio.sleep(delay) self.update_current_time() await self.update(self.time) finally: self.update_task = None
[docs] def stop_updates(self): if self.update_task: self.update_task.cancel() self.update_task = None
[docs] def update_current_time(self): self.time = self.map_time(arrow.now())
[docs] async def update(self, value, notify=True): if isinstance(value, str): value = parse_time(value) self.time = value await self.trigger(value)
[docs] async def observe(self): if not self.observers and not getattr(self, 'mocked', False): # the time is not automatically updated if there is no observers self.update_current_time() return self.time
def __str__(self): return '<(hour, minute)>' def __repr__(self): return str(self)
[docs]class SecondsObservable(TimeObservable): def __init__(self, interval): self.interval = interval super().__init__()
[docs] def map_time(self, tm: arrow.Arrow): return int(timemod.time())
[docs] def update_delay(self, now): return self.interval
[docs]def time_between(start, end): """Create observable, which is true for the given period in day. Example: time_between('13:00', '14:00') """ def _is_between(start, end, current): if start < end: return start <= current <= end else: return current <= end or current >= start start, end = get_time_observable(start), get_time_observable(end) return reactive.lambda_(_is_between, start, end, time)
[docs]def time_is(tm): """Create observable which is true for the given time in a day. Example: time_is('14:40') is True from 14:40:00 to 14:40:59. False otherwise. """ return time == get_time_observable(tm)
[docs]def every(*, sec=1, default=None): """Create observable which is true every `sec` seconds.""" return reactive.impuls(SecondsObservable(sec), default=default)
# Current time - tuple (hour, minute) time = TimeObservable() # Current hour hour = reactive.lambda_(lambda tm: tm[0], time, desc='hour') # True once every minute, None otherwise # pylint: disable=simplifiable-if-expression every_minute = reactive.lambda_(lambda tm: False if tm is None else True, reactive.impuls(time), desc='every_minute') # True once every hour, None otherwise every_hour = reactive.lambda_(lambda tm: False if tm is None else True, reactive.impuls(hour), desc='every_hour') # Day of week as string (for example 'sunday') weekday = reactive.lambda_(lambda _: arrow.now().format('dddd').lower(), every_hour, desc='weekday') monday = weekday == 'monday' tuesday = weekday == 'tuesday' wednesday = weekday == 'wednesday' thursday = weekday == 'thursday' friday = weekday == 'friday' saturday = weekday == 'saturday' sunday = weekday == 'sunday' weekend = saturday | sunday workingday = weekend == False # pylint: disable=singleton-comparison