Source code for caspia.toolbox.integrations.philipshue.cli

import asyncio
import os.path

import click
import yaml

from caspia.meadow.client import Gateway
from caspia.toolbox.cli import async_command, config_option, logging_options, storage_option
from caspia.toolbox.storage import ShelveStorage

from .api import HueApi
from .service import HueServicesProvider


[docs]def load_config(path): if path.exists(): with open(path) as f: return yaml.load(f) else: return dict()
@click.command() @config_option(default='philips-hue.yaml') @storage_option(default='philips-hue') @click.option('--pair', is_flag=True) @logging_options() @async_command() async def run_config(storage, config, pair): # pylint: disable=too-many-locals # load the config config_data = load_config(config) # connect to hue storage = ShelveStorage(str(storage / 'data')) if pair and 'user_name' in storage: del storage['user_name'] hue = HueApi(storage=storage) print('connecting ... (push the big button, if its taking a long time)') await hue.connect() print('successfully connected') for lightid, light in (await hue.list_lights()).items(): huename = light['name'] # find existing configuration for this light default_name = '' for name, configured_light in config_data.get('lights', {}).items(): if configured_light.get('hue-identifier') == lightid: default_name = name break entered_name = input(f'Enter name for light "{huename}" [{default_name}]: ') resolved_name = entered_name or default_name if resolved_name: light_config = {'hue-identifier': lightid} lights = config_data.get('lights', {}) lights[resolved_name] = light_config config_data['lights'] = lights # save the config print('Saving the config') os.makedirs(os.path.dirname(config), exist_ok=True) with open(config, 'w') as f: yaml.dump(config_data, f) @click.command() @config_option(default='philips-hue.yaml') @storage_option(default='philips-hue') @click.option('--broker-url', envvar='BROKER_URL', type=str, default='localhost') @click.option('--name', default='hue') @logging_options() @async_command async def run(broker_url, name, config, storage): storage = ShelveStorage(str(storage / 'data')) hue = HueApi(storage=storage) await hue.connect() services_provider = HueServicesProvider(hue, load_config(config)) await services_provider.setup() gateway = Gateway(name, broker_url=broker_url) gateway.add(services_provider.services) while True: await asyncio.sleep(60)