Source code for caspia.node.cli.utils

import asyncio
import functools
import os.path
import sys

import click
from aiopollen import Client, errors

import caspia.node
from caspia.node.utils import hwid_read


[docs]def async_command(f): @functools.wraps(f) def wrapped(*args, **kwargs): loop = asyncio.get_event_loop() try: return loop.run_until_complete(f(*args, **kwargs)) except (asyncio.TimeoutError, errors.TimeoutError): click.secho('Timed out.', fg='red', err=True) sys.exit(1) return wrapped
[docs]def with_prepared_client(f): @click.pass_obj @async_command @functools.wraps(f) async def wrapped(obj, *args, **kwargs): loop = asyncio.get_event_loop() with Client(obj['local_can_id'], obj['bus'], loop) as client: await f(*args, client=client, **kwargs) return wrapped
[docs]def with_prepared_node(f): @click.pass_obj @async_command @functools.wraps(f) async def wrapped(obj, *args, **kwargs): loop = asyncio.get_event_loop() with Client(obj['local_can_id'], obj['bus'], loop) as client: node = caspia.node.Node(client, obj['can_id']) await f(*args, node=node, **kwargs) return wrapped
[docs]def component_group_command(component_cls): """Register the given function as click.group(). Adds can_id and comp_id arguments, inserts their value into context and passes the values to the function """ def decorator(f): @click.group() @click.argument('can_id', type=canid_input()) @click.argument('comp_id', type=int_input(15)) @click.pass_obj @functools.wraps(f) def wrapped(obj, can_id, comp_id, *args, **kwargs): obj['can_id'] = can_id obj['comp_id'] = comp_id obj['comp_cls'] = component_cls return f(*args, can_id=can_id, comp_id=comp_id, **kwargs) return wrapped return decorator
[docs]def with_group_component(f): @click.pass_obj @with_prepared_node @functools.wraps(f) async def wrapped(obj, node, *args, **kwargs): _, component = node.register_component(obj['comp_cls'], obj['comp_id']) await f(*args, component=component, **kwargs) return wrapped
[docs]def hwid_input(): def _input(txt): if os.path.isfile(txt): with open(txt, 'r') as f: return hwid_read(f) else: return hwid_read(txt) return _input
[docs]def int_input(max_value): def validator(s): if isinstance(s, int): return s value = int(s, 0) if value > max_value: raise ValueError return value return validator
[docs]def canid_input(): return int_input(0x1FF)