import asyncio
import csv
import datetime
import inspect
import logging
import os
import re
import shlex
import signal
import time
from pathlib import Path
import click
import yaml
from prompt_toolkit import prompt_async
from prompt_toolkit.completion import Completer
from prompt_toolkit.contrib.completers import WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory, InMemoryHistory
from caspia.meadow.client import ServiceBrowser
from caspia.toolbox.logging import datetime_fmt
from caspia.toolbox.name import create_pattern_re
from . import utils
logger = logging.getLogger(__name__)
[docs]class ShellCompleter(Completer):
def __init__(self, shell):
self.shell = shell
self.commands_completer = self.create_command_completer()
[docs] def create_command_completer(self):
commands = self.list_commands()
meta_dict = {}
for cmd in commands:
doc = inspect.getdoc(getattr(self.shell, 'do_' + cmd.replace('-', '_')))
usage, *_ = doc.split('\n')
params = ' '.join(usage.split(' ')[1:])
meta_dict[cmd] = params
return WordCompleter(commands, meta_dict=meta_dict)
[docs] def get_completions(self, document, complete_event):
line = document.current_line_before_cursor.split(maxsplit=1)
cmd, arg = line[0] if line else '', line[1] if len(line) == 2 else ''
if not arg and document.char_before_cursor != ' ':
yield from self.commands_completer.get_completions(document, complete_event)
elif hasattr(self.shell, 'complete_' + cmd.replace('-', '_')):
completer = getattr(self.shell, 'complete_' + cmd.replace('-', '_'))
arg_document = Document(arg, cursor_position=document.cursor_position - len(cmd) - 1)
yield from completer(arg_document, complete_event)
[docs] def list_commands(self):
methods = [
f[3:].replace('_', '-') for f in dir(self.shell)
if f.startswith('do_') and callable(getattr(self.shell, f.replace('-', '_')))
]
return methods
[docs]class Shell:
def __init__(self, consumer_conn, command=None):
self.consumer_conn = consumer_conn
self.loop = asyncio.get_event_loop()
self.browser = ServiceBrowser(connection=consumer_conn, loop=self.loop)
self.browser.add_services_update_hook(self.on_services_update)
self.lookup = self.browser.lookup
self.command = command
self.new_services_event = asyncio.Event(loop=self.loop)
self.prepare_history()
[docs] def prepare_history(self):
history_path = Path(os.environ.get('CSP_MEADOW_HISTORY_PATH', '~/.meadow_history'))
history_path = history_path.expanduser()
try:
if not os.path.isfile(history_path):
with open(history_path, 'w'):
pass
self.history = FileHistory(history_path)
except IOError as e:
logger.error('Could not use file history (%s): %r', history_path, e)
self.history = InMemoryHistory()
@property
def services(self):
return self.lookup.services
[docs] async def on_services_update(self, added, removed):
self.new_services_event.set()
[docs] async def wait_for_service(self, service_name):
while True:
if service_name in self.services:
break
await self.new_services_event.wait()
self.new_services_event.clear()
[docs] async def run(self):
if not self.command:
print('Welcome to meadow. Enjoy!')
while True:
try:
if self.command:
cmd = self.command
else:
cmd = await prompt_async('(meadow) ',
patch_stdout=True,
completer=ShellCompleter(self),
history=self.history)
cmd = cmd.strip()
parts = cmd.split()
if len(parts) == 0:
continue
method_name = 'do_' + parts[0].replace('-', '_')
if hasattr(self, method_name):
await getattr(self, method_name)(cmd[len(parts[0]) + 1:])
else:
print('Uknown command %r. Type "help" for help.' % parts[0])
except EOFError:
print('Bye.')
break
except asyncio.CancelledError:
break
except Exception as e: # pylint: disable=broad-except
print('Exception %r' % e)
if self.command:
break
[docs] async def do_list_services(self, _):
""" list-services
List all available services.
"""
for name in self.lookup.services:
print(name)
[docs] async def do_help(self, _):
""" help
Print this message.
"""
for method_name in [name for name in dir(self) if name.startswith('do_')]:
doc = inspect.getdoc(getattr(self, method_name))
usage, *description = doc.split('\n')
click.secho(usage, bold=True)
click.secho('\n'.join('\t' + d for d in description))
[docs] def complete_service_and_characteristic(self, document, complete_event, cmd):
word_number = len(document.current_line_before_cursor.split())
if document.char_before_cursor == ' ':
word_number += 1
if word_number <= 1:
yield from self.complete_service_name(document, complete_event)
elif word_number == 2:
service_name = document.current_line_before_cursor.split()[0]
yield from self.complete_characteristic_name(document,
complete_event,
service_name,
cmd=cmd)
[docs] def complete_service_name(self, document, complete_event):
names = sorted(list(self.services.keys()))
meta = {name: self.services[name].type for name in names}
yield from WordCompleter(sorted(names), WORD=True,
meta_dict=meta, match_middle=True).\
get_completions(document, complete_event)
[docs] def complete_characteristic_name(self, document, complete_event, service_name, cmd):
try:
service = self.services[service_name]
except KeyError:
return
chars = service.characteristics
if cmd == 'write':
names = [char.name for char in chars.values() if char.writable]
elif cmd == 'read':
names = [char.name for char in chars.values() if char.readable]
else:
names = [char.name for char in chars.values()]
meta = {char.name: char.value_type for char in chars.values()}
yield from WordCompleter(sorted(names), WORD=True,
meta_dict=meta).get_completions(document, complete_event)
[docs] @utils.measure_time_async
async def do_write(self, arg):
""" write <service> <characteristic> [value]
Write to characteristic.
If value is omitted, `None` will be used.
"""
parts = shlex.split(arg)
if len(parts) == 4:
service_name, characteristic_name, raw_value, raw_meta = parts
meta = yaml.load(raw_meta)
elif len(parts) == 3:
service_name, characteristic_name, raw_value = parts
meta = dict()
elif len(parts) == 2:
service_name, characteristic_name = parts
raw_value, meta = 'null', dict()
else:
print('Usage: write <service> <characteristic> [value=None]')
return
characteristic = await self.get_characteristic_or_inform(service_name, characteristic_name)
if not characteristic:
return
if characteristic.value_type == 'bytes':
value = eval('b"{}"'.format(raw_value)) # pylint: disable=eval-used
else:
value = yaml.load(raw_value)
await characteristic.write(value, extra=meta)
[docs] def complete_write(self, document, complete_event):
return self.complete_service_and_characteristic(document, complete_event, cmd='write')
[docs] @utils.measure_time_async
async def do_read(self, arg):
""" read <service> <characteristic>
Read characteristic value.
"""
parts = arg.split()
if len(parts) != 2:
print('Usage: read <service> <characteristic>')
return
characteristic = await self.get_characteristic_or_inform(parts[0], parts[1])
if characteristic is None:
return
value = await characteristic.read()
print(value)
[docs] def complete_read(self, document, complete_event):
return self.complete_service_and_characteristic(document, complete_event, cmd='read')
[docs] async def do_listen(self, arg):
""" listen [filter]
Listen for characteristic notifications.
Filter may include "*" to match any string.
Default filter is "*".
"""
pattern = arg.strip() or '*'
pattern_c = create_pattern_re(pattern)
def should_be_presented(service_name):
return bool(pattern_c.fullmatch(service_name))
def on_value_notification(value, extra, service, characteristic, **_):
if not should_be_presented(service):
return
service_instance = self.browser.lookup.find(service)
if service_instance:
characteristic_instance = service_instance[characteristic]
value = characteristic_instance.deserialize_value(value)
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(datetime_fmt())
click.secho('[%s] ' % timestamp, bold=True, nl=False, err=False)
if isinstance(value, float): # Better formatting for float
value = '{:0.3f}'.format(value)
click.secho(service + ' ', fg='green', nl=False)
click.secho(characteristic + ' ', fg='blue', nl=False)
click.secho(str(value), bold=True, nl=False)
if extra:
click.secho(' ' + str(extra))
else:
click.secho('')
def on_error_notification(error, service, characteristic, **_):
if not should_be_presented(service):
return
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(datetime_fmt())
click.secho('[%s] ' % timestamp, bold=True, nl=False, err=False)
click.secho(service + ' ', fg='green', nl=False)
click.secho(characteristic + ' ', fg='blue', nl=False)
click.secho(repr(error), bold=True, fg='red')
subscription = await self.consumer_conn.subscribe(None, None, on_value_notification,
on_error_notification)
loop = asyncio.get_event_loop()
ctrl_c_future = loop.create_future()
def on_control_c_signal():
ctrl_c_future.set_result(None)
loop.add_signal_handler(signal.SIGINT, on_control_c_signal)
try:
print('Hit Ctr+C to stop listening...')
await ctrl_c_future
finally:
subscription.unsubscribe()
loop.remove_signal_handler(signal.SIGINT)
print()
[docs] def complete_listen(self, document, complete_event):
yield from self.complete_service_name(document, complete_event)
METADATA_TYPES = {'friendly-name': str, 'join-with-room': str}
[docs]async def run_shell(consumer_conn, command=None):
await Shell(consumer_conn, command=command).run()