import asyncio
[docs]class ManagedTask:
def __init__(self, task, bound_obj=None):
self.bound_obj = bound_obj
self.task = task
self.schedule_handle = None
self.pending_args = []
self.pending_kwargs = {}
self.lock = asyncio.Lock()
[docs] def schedule(self, delay=None, args=None, kwargs=None):
self.cancel()
loop = asyncio.get_event_loop()
self.pending_args = args or []
self.pending_kwargs = kwargs or {}
self.schedule_handle = loop.call_later(delay or 0.0, self._schedule_callback)
[docs] def cancel(self):
if self.schedule_handle:
self.pending_args = None
self.pending_kwargs = None
self.schedule_handle.cancel()
self.schedule_handle = None
def _schedule_callback(self):
asyncio.ensure_future(self.run(*self.pending_args, **self.pending_kwargs))
self.pending_args = None
self.pending_kwargs = None
self.schedule_handle = None
[docs] async def run(self, *args, **kwargs):
async with self.lock:
if self.bound_obj is not None:
return await self.task(self.bound_obj, *args, **kwargs)
else:
return await self.task(*args, **kwargs)
async def __call__(self, *args, **kwargs):
self.cancel()
return await self.run(*args, **kwargs)
def __get__(self, obj, objtype=None):
if obj is None:
return self
key = self.task.__name__ + '_task'
if not hasattr(obj, key):
setattr(obj, key, self.copy(bound_obj=obj))
return getattr(obj, key)
[docs] def copy(self, bound_obj=None):
return ManagedTask(self.task, bound_obj=bound_obj)
[docs]def managed_task():
def decorator(f):
return ManagedTask(f)
return decorator