import asyncio
import time as timemod
import arrow
from caspia import reactive
[docs]def parse_time(tm):
if isinstance(tm, tuple):
return tm
else:
h, m = tm.split(':')
return int(h), int(m)
[docs]def get_time_observable(tm):
if isinstance(tm, reactive.Observable):
return tm
else:
return reactive.Value(parse_time(tm))
[docs]class TimeObservable(reactive.Observable):
def __init__(self):
super().__init__()
self.update_task = None
self.time = None
[docs] def map_time(self, tm: arrow.Arrow):
return (tm.hour, tm.minute)
[docs] def update_delay(self, now):
return (60 - now.second) + 1
[docs] def become_subscribed(self):
self.update_current_time()
self.start_updates()
[docs] def become_unsubscribed(self):
self.stop_updates()
[docs] def start_updates(self):
if getattr(self, 'mocked', False):
return
if self.update_task:
self.update_task.cancel()
self.update_task = None
self.update_task = asyncio.ensure_future(self.update_periodically())
[docs] async def update_periodically(self):
try:
while True:
now = arrow.now()
delay = self.update_delay(now)
await asyncio.sleep(delay)
self.update_current_time()
await self.update(self.time)
finally:
self.update_task = None
[docs] def stop_updates(self):
if self.update_task:
self.update_task.cancel()
self.update_task = None
[docs] def update_current_time(self):
self.time = self.map_time(arrow.now())
[docs] async def update(self, value, notify=True):
if isinstance(value, str):
value = parse_time(value)
self.time = value
await self.trigger(value)
[docs] async def observe(self):
if not self.observers and not getattr(self, 'mocked', False):
# the time is not automatically updated if there is no observers
self.update_current_time()
return self.time
def __str__(self):
return '<(hour, minute)>'
def __repr__(self):
return str(self)
[docs]class SecondsObservable(TimeObservable):
def __init__(self, interval):
self.interval = interval
super().__init__()
[docs] def map_time(self, tm: arrow.Arrow):
return int(timemod.time())
[docs] def update_delay(self, now):
return self.interval
[docs]def time_between(start, end):
"""Create observable, which is true for the given period in day.
Example: time_between('13:00', '14:00')
"""
def _is_between(start, end, current):
if start < end:
return start <= current <= end
else:
return current <= end or current >= start
start, end = get_time_observable(start), get_time_observable(end)
return reactive.lambda_(_is_between, start, end, time)
[docs]def time_is(tm):
"""Create observable which is true for the given time in a day.
Example: time_is('14:40') is True from 14:40:00 to 14:40:59. False otherwise.
"""
return time == get_time_observable(tm)
[docs]def every(*, sec=1, default=None):
"""Create observable which is true every `sec` seconds."""
return reactive.impuls(SecondsObservable(sec), default=default)
# Current time - tuple (hour, minute)
time = TimeObservable()
# Current hour
hour = reactive.lambda_(lambda tm: tm[0], time, desc='hour')
# True once every minute, None otherwise
# pylint: disable=simplifiable-if-expression
every_minute = reactive.lambda_(lambda tm: False if tm is None else True,
reactive.impuls(time),
desc='every_minute')
# True once every hour, None otherwise
every_hour = reactive.lambda_(lambda tm: False if tm is None else True,
reactive.impuls(hour),
desc='every_hour')
# Day of week as string (for example 'sunday')
weekday = reactive.lambda_(lambda _: arrow.now().format('dddd').lower(), every_hour, desc='weekday')
monday = weekday == 'monday'
tuesday = weekday == 'tuesday'
wednesday = weekday == 'wednesday'
thursday = weekday == 'thursday'
friday = weekday == 'friday'
saturday = weekday == 'saturday'
sunday = weekday == 'sunday'
weekend = saturday | sunday
workingday = weekend == False # pylint: disable=singleton-comparison