Source code for caspia.homeserver.views.meadow

import asyncio
import json
import logging

from sanic import exceptions, response
from sanic.websocket import ConnectionClosed

from caspia.meadow import errors as meadow_errors
from caspia.meadow.client.connection import ConsumerConnection
from caspia.meadow.services import Characteristic

from .base import View

logger = logging.getLogger(__name__)


[docs]class ServiceView(View):
[docs] async def get(self, request, service_name): try: service = await self.meadow_service.get_service(service_name) return response.json(service.serialize()) except ValueError: return self.error_response(404, f"Service {service_name} is not known")
[docs]class ServicesView(View):
[docs] async def get(self, request): services = self.meadow_service.get_services() serialized = [s.serialize() async for s in services] return response.json(serialized)
[docs]class CharacteristicValueView(View):
[docs] async def get_characteristic(self, service_name, characteristic_name) -> Characteristic: try: service = await self.meadow_service.get_service(service_name) characteristic = service[characteristic_name] return characteristic except ValueError: raise exceptions.NotFound(f"Service {service_name} is not known") except KeyError: msg = f"Service {service_name} does not have characteristic {characteristic_name}" raise exceptions.NotFound(msg)
[docs] @staticmethod def parse_value_and_extra(payload, characteristic: Characteristic): if characteristic.value_type == 'void': if payload is None: payload = {} payload['value'] = None if characteristic.value_type == 'integer': payload['value'] = int(payload['value']) elif characteristic.value_type == 'float': payload['value'] = float(payload['value']) value = characteristic.deserialize_value(payload['value']) extra = {k: v for k, v in payload.items() if k != 'value'} return value, extra
[docs] def translate_meadow_error(self, error, *, on_read): if isinstance(error, meadow_errors.InvalidValueError) and on_read: status = 400 elif isinstance(error, meadow_errors.NotSupportedError): status = 405 else: status = 504 return response.json(error.to_dict(), status=status)
[docs] async def get(self, request, service_name, characteristic_name): characteristic = await self.get_characteristic(service_name, characteristic_name) try: value, extra = await characteristic.read(extra=True) except meadow_errors.MeadowError as error: return self.translate_meadow_error(error, on_read=True) serialized_value = characteristic.serialize_value(value) return response.json(dict(value=serialized_value, **extra))
[docs] async def put(self, request, service_name, characteristic_name): characteristic = await self.get_characteristic(service_name, characteristic_name) value, extra = self.parse_value_and_extra(request.json, characteristic) try: await characteristic.write(value, extra=extra) except meadow_errors.MeadowError as error: return self.translate_meadow_error(error, on_read=False) return response.json(dict())
[docs]class NotificationsView(View): @property def consumer_conn(self) -> ConsumerConnection: return self.meadow_service.consumer_conn
[docs] async def ws(self, request, ws): # pylint: disable=too-many-branches subscriptions = dict() async def on_notification(value, extra, service, characteristic): msg = { 'message': 'notification', 'service': service, 'characteristic': characteristic, 'value': value, 'extra': extra, } data = json.dumps(msg) await ws.send(data) try: while True: data = await ws.recv() msg = json.loads(data) logger.info('message %s', msg) if msg['message'] == 'subscribe': logger.info('message is subscribe') for service_name, characteristic_name in msg['characteristics']: key = (service_name, characteristic_name) if key in subscriptions: logger.info('already subscribed to %s', key) continue subscription = await self.consumer_conn.subscribe( service_name, characteristic_name, on_notification) logger.info('successfully subscribed to %s', key) subscriptions[key] = subscription elif msg['message'] == 'unsubscribe': for service_name, characteristic_name in msg['characteristics']: key = (service_name, characteristic_name) if key not in subscriptions: continue subscription = subscriptions[key] del subscriptions[key] subscription.unsubscribe() elif msg['message'] == 'unsubscribe-all': while len(subscriptions): _, subscription = subscriptions.popitem() subscription.unsubscribe() elif msg['message'] == 'ping': msg = {'message': 'pong'} data = json.dumps(msg) await ws.send(data) else: logger.warning('invalid message type %s', msg['message']) finally: for subscription in subscriptions.values(): subscription.unsubscribe()
[docs] async def get(self, request): protocol = request.transport.get_protocol() ws = await protocol.websocket_handshake(request) fut = asyncio.ensure_future(self.ws(request, ws), loop=self.loop) request.app.websocket_tasks.add(fut) try: await fut except (asyncio.CancelledError, ConnectionClosed): pass finally: request.app.websocket_tasks.remove(fut) await ws.close()