import asyncio
import json
import logging
from sanic import exceptions, response
from sanic.websocket import ConnectionClosed
from caspia.meadow import errors as meadow_errors
from caspia.meadow.client.connection import ConsumerConnection
from caspia.meadow.services import Characteristic
from .base import View
logger = logging.getLogger(__name__)
[docs]class ServiceView(View):
[docs] async def get(self, request, service_name):
try:
service = await self.meadow_service.get_service(service_name)
return response.json(service.serialize())
except ValueError:
return self.error_response(404, f"Service {service_name} is not known")
[docs]class ServicesView(View):
[docs] async def get(self, request):
services = self.meadow_service.get_services()
serialized = [s.serialize() async for s in services]
return response.json(serialized)
[docs]class CharacteristicValueView(View):
[docs] async def get_characteristic(self, service_name, characteristic_name) -> Characteristic:
try:
service = await self.meadow_service.get_service(service_name)
characteristic = service[characteristic_name]
return characteristic
except ValueError:
raise exceptions.NotFound(f"Service {service_name} is not known")
except KeyError:
msg = f"Service {service_name} does not have characteristic {characteristic_name}"
raise exceptions.NotFound(msg)
[docs] @staticmethod
def parse_value_and_extra(payload, characteristic: Characteristic):
if characteristic.value_type == 'void':
if payload is None:
payload = {}
payload['value'] = None
if characteristic.value_type == 'integer':
payload['value'] = int(payload['value'])
elif characteristic.value_type == 'float':
payload['value'] = float(payload['value'])
value = characteristic.deserialize_value(payload['value'])
extra = {k: v for k, v in payload.items() if k != 'value'}
return value, extra
[docs] def translate_meadow_error(self, error, *, on_read):
if isinstance(error, meadow_errors.InvalidValueError) and on_read:
status = 400
elif isinstance(error, meadow_errors.NotSupportedError):
status = 405
else:
status = 504
return response.json(error.to_dict(), status=status)
[docs] async def get(self, request, service_name, characteristic_name):
characteristic = await self.get_characteristic(service_name, characteristic_name)
try:
value, extra = await characteristic.read(extra=True)
except meadow_errors.MeadowError as error:
return self.translate_meadow_error(error, on_read=True)
serialized_value = characteristic.serialize_value(value)
return response.json(dict(value=serialized_value, **extra))
[docs] async def put(self, request, service_name, characteristic_name):
characteristic = await self.get_characteristic(service_name, characteristic_name)
value, extra = self.parse_value_and_extra(request.json, characteristic)
try:
await characteristic.write(value, extra=extra)
except meadow_errors.MeadowError as error:
return self.translate_meadow_error(error, on_read=False)
return response.json(dict())
[docs]class NotificationsView(View):
@property
def consumer_conn(self) -> ConsumerConnection:
return self.meadow_service.consumer_conn
[docs] async def ws(self, request, ws):
# pylint: disable=too-many-branches
subscriptions = dict()
async def on_notification(value, extra, service, characteristic):
msg = {
'message': 'notification',
'service': service,
'characteristic': characteristic,
'value': value,
'extra': extra,
}
data = json.dumps(msg)
await ws.send(data)
try:
while True:
data = await ws.recv()
msg = json.loads(data)
logger.info('message %s', msg)
if msg['message'] == 'subscribe':
logger.info('message is subscribe')
for service_name, characteristic_name in msg['characteristics']:
key = (service_name, characteristic_name)
if key in subscriptions:
logger.info('already subscribed to %s', key)
continue
subscription = await self.consumer_conn.subscribe(
service_name, characteristic_name, on_notification)
logger.info('successfully subscribed to %s', key)
subscriptions[key] = subscription
elif msg['message'] == 'unsubscribe':
for service_name, characteristic_name in msg['characteristics']:
key = (service_name, characteristic_name)
if key not in subscriptions:
continue
subscription = subscriptions[key]
del subscriptions[key]
subscription.unsubscribe()
elif msg['message'] == 'unsubscribe-all':
while len(subscriptions):
_, subscription = subscriptions.popitem()
subscription.unsubscribe()
elif msg['message'] == 'ping':
msg = {'message': 'pong'}
data = json.dumps(msg)
await ws.send(data)
else:
logger.warning('invalid message type %s', msg['message'])
finally:
for subscription in subscriptions.values():
subscription.unsubscribe()
[docs] async def get(self, request):
protocol = request.transport.get_protocol()
ws = await protocol.websocket_handshake(request)
fut = asyncio.ensure_future(self.ws(request, ws), loop=self.loop)
request.app.websocket_tasks.add(fut)
try:
await fut
except (asyncio.CancelledError, ConnectionClosed):
pass
finally:
request.app.websocket_tasks.remove(fut)
await ws.close()