import asyncio
import struct
from dataclasses import dataclass, field
from caspia.node import errors, utils
from caspia.node.events import BroadcastEvent
from .base import Component
[docs]class Serial(Component):
type = 0x09
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.is_component_broadcast:
if broadcast.component_data[0] == 0x01:
return Serial.DataReceivedEvent(broadcast)
else:
raise errors.CaspiaError('Invalid broadcast')
return None
def _chunks(self, l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
async def _on_new_data(self, func, timeout=None):
""" func(data) should return True to stop the subscription """
def accept(event):
if isinstance(event, Serial.DataReceivedEvent):
return func(event.data)
return None
await utils.receive_event(accept, self.node.client, timeout=timeout)
[docs] async def set_baudrate(self, baudrate):
assert baudrate % 100 == 0, 'baudrate must be divisable by 100'
await self.request(struct.pack('<BH', 2, baudrate // 100))
[docs] async def write(self, data):
for chunk in self._chunks(data, 14):
await self.request(b'\x01' + chunk)
[docs] async def read(self, length=None, timeout=None):
if length is None and timeout is None:
return self.state.read()
else:
data = self.state.read(length)
if len(data) == length:
return data
def new_data_received(_):
nonlocal data
if length is not None:
data += self.state.read(length - len(data))
else:
data += self.state.read()
return len(data) == length
try:
await self._on_new_data(new_data_received, timeout=timeout)
except asyncio.TimeoutError:
pass
return data
[docs] @dataclass
class DataReceivedEvent(BroadcastEvent):
data: bytes = field(init=False)
def __post_init__(self):
super().__post_init__()
self.data = self.broadcast.component_data[1:]
[docs] class State(Component.State):
def __init__(self):
self.received_data = bytes()
[docs] def update_from_bytes(self, data):
pass
[docs] def update_from_event(self, event):
if isinstance(event, Serial.DataReceivedEvent):
self.received_data += event.data
[docs] def read(self, max_length=None):
if max_length is None:
max_length = len(self.received_data)
data, self.received_data = \
self.received_data[:max_length], self.received_data[max_length:]
return data
def __repr_fields__(self):
return dict(pending_received_data=self.received_data.hex())
[docs] class Config(Component.Config):
def __init__(self, baudrate=None):
self.baudrate = baudrate if baudrate is not None else 9600
[docs] def get_bytes(self):
return struct.pack('<H', self.baudrate // 100)
[docs] @classmethod
def from_bytes(cls, data):
return cls(baudrate=struct.unpack('<H', data)[0] * 100)
def __repr_fields__(self):
return dict(baudrate=self.baudrate)