Source code for caspia.meadow.cli.shell

import asyncio
import csv
import datetime
import inspect
import logging
import os
import re
import shlex
import signal
import time
from pathlib import Path

import click
import yaml
from prompt_toolkit import prompt_async
from prompt_toolkit.completion import Completer
from prompt_toolkit.contrib.completers import WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory, InMemoryHistory

from caspia.meadow.client import ServiceBrowser
from caspia.toolbox.logging import datetime_fmt
from caspia.toolbox.name import create_pattern_re

from . import utils

logger = logging.getLogger(__name__)


[docs]class ShellCompleter(Completer): def __init__(self, shell): self.shell = shell self.commands_completer = self.create_command_completer()
[docs] def create_command_completer(self): commands = self.list_commands() meta_dict = {} for cmd in commands: doc = inspect.getdoc(getattr(self.shell, 'do_' + cmd.replace('-', '_'))) usage, *_ = doc.split('\n') params = ' '.join(usage.split(' ')[1:]) meta_dict[cmd] = params return WordCompleter(commands, meta_dict=meta_dict)
[docs] def get_completions(self, document, complete_event): line = document.current_line_before_cursor.split(maxsplit=1) cmd, arg = line[0] if line else '', line[1] if len(line) == 2 else '' if not arg and document.char_before_cursor != ' ': yield from self.commands_completer.get_completions(document, complete_event) elif hasattr(self.shell, 'complete_' + cmd.replace('-', '_')): completer = getattr(self.shell, 'complete_' + cmd.replace('-', '_')) arg_document = Document(arg, cursor_position=document.cursor_position - len(cmd) - 1) yield from completer(arg_document, complete_event)
[docs] def list_commands(self): methods = [ f[3:].replace('_', '-') for f in dir(self.shell) if f.startswith('do_') and callable(getattr(self.shell, f.replace('-', '_'))) ] return methods
[docs]class Shell: def __init__(self, consumer_conn, command=None): self.consumer_conn = consumer_conn self.loop = asyncio.get_event_loop() self.browser = ServiceBrowser(connection=consumer_conn, loop=self.loop) self.browser.add_services_update_hook(self.on_services_update) self.lookup = self.browser.lookup self.command = command self.new_services_event = asyncio.Event(loop=self.loop) self.prepare_history()
[docs] def prepare_history(self): history_path = Path(os.environ.get('CSP_MEADOW_HISTORY_PATH', '~/.meadow_history')) history_path = history_path.expanduser() try: if not os.path.isfile(history_path): with open(history_path, 'w'): pass self.history = FileHistory(history_path) except IOError as e: logger.error('Could not use file history (%s): %r', history_path, e) self.history = InMemoryHistory()
@property def services(self): return self.lookup.services
[docs] async def on_services_update(self, added, removed): self.new_services_event.set()
[docs] async def wait_for_service(self, service_name): while True: if service_name in self.services: break await self.new_services_event.wait() self.new_services_event.clear()
[docs] async def run(self): if not self.command: print('Welcome to meadow. Enjoy!') while True: try: if self.command: cmd = self.command else: cmd = await prompt_async('(meadow) ', patch_stdout=True, completer=ShellCompleter(self), history=self.history) cmd = cmd.strip() parts = cmd.split() if len(parts) == 0: continue method_name = 'do_' + parts[0].replace('-', '_') if hasattr(self, method_name): await getattr(self, method_name)(cmd[len(parts[0]) + 1:]) else: print('Uknown command %r. Type "help" for help.' % parts[0]) except EOFError: print('Bye.') break except asyncio.CancelledError: break except Exception as e: # pylint: disable=broad-except print('Exception %r' % e) if self.command: break
[docs] async def do_list_services(self, _): """ list-services List all available services. """ for name in self.lookup.services: print(name)
[docs] async def do_help(self, _): """ help Print this message. """ for method_name in [name for name in dir(self) if name.startswith('do_')]: doc = inspect.getdoc(getattr(self, method_name)) usage, *description = doc.split('\n') click.secho(usage, bold=True) click.secho('\n'.join('\t' + d for d in description))
[docs] async def get_characteristic_or_inform(self, service_name, characteristic_name): if self.command: await self.wait_for_service(service_name) service = self.lookup.find(service_name) if service is None: click.secho('Service %s not found.' % service_name, err=True, fg='red') return try: characteristic = service[characteristic_name] return characteristic except KeyError: fmt = 'Service %s does not have characteristic %s.' click.secho(fmt % (service_name, characteristic_name), err=True, fg='red') return
[docs] def complete_service_and_characteristic(self, document, complete_event, cmd): word_number = len(document.current_line_before_cursor.split()) if document.char_before_cursor == ' ': word_number += 1 if word_number <= 1: yield from self.complete_service_name(document, complete_event) elif word_number == 2: service_name = document.current_line_before_cursor.split()[0] yield from self.complete_characteristic_name(document, complete_event, service_name, cmd=cmd)
[docs] def complete_service_name(self, document, complete_event): names = sorted(list(self.services.keys())) meta = {name: self.services[name].type for name in names} yield from WordCompleter(sorted(names), WORD=True, meta_dict=meta, match_middle=True).\ get_completions(document, complete_event)
[docs] def complete_characteristic_name(self, document, complete_event, service_name, cmd): try: service = self.services[service_name] except KeyError: return chars = service.characteristics if cmd == 'write': names = [char.name for char in chars.values() if char.writable] elif cmd == 'read': names = [char.name for char in chars.values() if char.readable] else: names = [char.name for char in chars.values()] meta = {char.name: char.value_type for char in chars.values()} yield from WordCompleter(sorted(names), WORD=True, meta_dict=meta).get_completions(document, complete_event)
[docs] @utils.measure_time_async async def do_write(self, arg): """ write <service> <characteristic> [value] Write to characteristic. If value is omitted, `None` will be used. """ parts = shlex.split(arg) if len(parts) == 4: service_name, characteristic_name, raw_value, raw_meta = parts meta = yaml.load(raw_meta) elif len(parts) == 3: service_name, characteristic_name, raw_value = parts meta = dict() elif len(parts) == 2: service_name, characteristic_name = parts raw_value, meta = 'null', dict() else: print('Usage: write <service> <characteristic> [value=None]') return characteristic = await self.get_characteristic_or_inform(service_name, characteristic_name) if not characteristic: return if characteristic.value_type == 'bytes': value = eval('b"{}"'.format(raw_value)) # pylint: disable=eval-used else: value = yaml.load(raw_value) await characteristic.write(value, extra=meta)
[docs] def complete_write(self, document, complete_event): return self.complete_service_and_characteristic(document, complete_event, cmd='write')
[docs] @utils.measure_time_async async def do_read(self, arg): """ read <service> <characteristic> Read characteristic value. """ parts = arg.split() if len(parts) != 2: print('Usage: read <service> <characteristic>') return characteristic = await self.get_characteristic_or_inform(parts[0], parts[1]) if characteristic is None: return value = await characteristic.read() print(value)
[docs] def complete_read(self, document, complete_event): return self.complete_service_and_characteristic(document, complete_event, cmd='read')
[docs] async def do_listen(self, arg): """ listen [filter] Listen for characteristic notifications. Filter may include "*" to match any string. Default filter is "*". """ pattern = arg.strip() or '*' pattern_c = create_pattern_re(pattern) def should_be_presented(service_name): return bool(pattern_c.fullmatch(service_name)) def on_value_notification(value, extra, service, characteristic, **_): if not should_be_presented(service): return service_instance = self.browser.lookup.find(service) if service_instance: characteristic_instance = service_instance[characteristic] value = characteristic_instance.deserialize_value(value) timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(datetime_fmt()) click.secho('[%s] ' % timestamp, bold=True, nl=False, err=False) if isinstance(value, float): # Better formatting for float value = '{:0.3f}'.format(value) click.secho(service + ' ', fg='green', nl=False) click.secho(characteristic + ' ', fg='blue', nl=False) click.secho(str(value), bold=True, nl=False) if extra: click.secho(' ' + str(extra)) else: click.secho('') def on_error_notification(error, service, characteristic, **_): if not should_be_presented(service): return timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(datetime_fmt()) click.secho('[%s] ' % timestamp, bold=True, nl=False, err=False) click.secho(service + ' ', fg='green', nl=False) click.secho(characteristic + ' ', fg='blue', nl=False) click.secho(repr(error), bold=True, fg='red') subscription = await self.consumer_conn.subscribe(None, None, on_value_notification, on_error_notification) loop = asyncio.get_event_loop() ctrl_c_future = loop.create_future() def on_control_c_signal(): ctrl_c_future.set_result(None) loop.add_signal_handler(signal.SIGINT, on_control_c_signal) try: print('Hit Ctr+C to stop listening...') await ctrl_c_future finally: subscription.unsubscribe() loop.remove_signal_handler(signal.SIGINT) print()
[docs] def complete_listen(self, document, complete_event): yield from self.complete_service_name(document, complete_event)
METADATA_TYPES = {'friendly-name': str, 'join-with-room': str}
[docs] async def do_metadata_store_csv(self, arg): """ metadata-store-csv <file-path> [filter] Load metadata of all service and store them in a file. It is possible to store metadata of some services only using a service-name filter. """ args = arg.split() if len(args) == 0 or len(args) > 2: print('Usage: metadata-store-csv <file-path> [filter]') return path = Path(args[0]) pattern = create_pattern_re(args[1] if len(args) == 2 else '*') services = [s for s in self.services if pattern.fullmatch(s)] all_metadata = {} with click.progressbar(services, label='Downloading metadata') as services_: for service in services_: metadata, _ = await self.consumer_conn.read(service, '$metadata') all_metadata[service] = { k: v for k, v in metadata.items() if k in self.METADATA_TYPES } with open(path, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=['service-name'] + list(self.METADATA_TYPES)) writer.writeheader() for service in sorted(services): writer.writerow(dict(**{'service-name': service}, **all_metadata[service]))
[docs] async def do_metadata_load_csv(self, arg): """ metadata-load-csv <file-path> Load metadata from a file and write them to meadow. """ args = arg.split() if len(args) != 1: print('Usage: metadata-load-csv <file-path>') return with open(Path(args[0]).expanduser(), 'r', newline='') as f: reader = csv.DictReader(f) rows = list(reader) with click.progressbar(rows, label='Updating metadata') as rows_: for row in rows_: service_name = row.pop('service-name') metadata = { k: self.METADATA_TYPES[k](v) if v != '' else None for k, v in row.items() } await self.consumer_conn.write(service_name, '$metadata', metadata)
[docs]async def run_shell(consumer_conn, command=None): await Shell(consumer_conn, command=command).run()