Source code for caspia.toolbox.managed_task

import asyncio


[docs]class ManagedTask: def __init__(self, task, bound_obj=None): self.bound_obj = bound_obj self.task = task self.schedule_handle = None self.pending_args = [] self.pending_kwargs = {} self.lock = asyncio.Lock()
[docs] def schedule(self, delay=None, args=None, kwargs=None): self.cancel() loop = asyncio.get_event_loop() self.pending_args = args or [] self.pending_kwargs = kwargs or {} self.schedule_handle = loop.call_later(delay or 0.0, self._schedule_callback)
[docs] def cancel(self): if self.schedule_handle: self.pending_args = None self.pending_kwargs = None self.schedule_handle.cancel() self.schedule_handle = None
def _schedule_callback(self): asyncio.ensure_future(self.run(*self.pending_args, **self.pending_kwargs)) self.pending_args = None self.pending_kwargs = None self.schedule_handle = None
[docs] async def run(self, *args, **kwargs): async with self.lock: if self.bound_obj is not None: return await self.task(self.bound_obj, *args, **kwargs) else: return await self.task(*args, **kwargs)
async def __call__(self, *args, **kwargs): self.cancel() return await self.run(*args, **kwargs) def __get__(self, obj, objtype=None): if obj is None: return self key = self.task.__name__ + '_task' if not hasattr(obj, key): setattr(obj, key, self.copy(bound_obj=obj)) return getattr(obj, key)
[docs] def copy(self, bound_obj=None): return ManagedTask(self.task, bound_obj=bound_obj)
[docs]def managed_task(): def decorator(f): return ManagedTask(f) return decorator