Source code for caspia.node.components.serial

import asyncio
import struct
from dataclasses import dataclass, field

from caspia.node import errors, utils
from caspia.node.events import BroadcastEvent

from .base import Component


[docs]class Serial(Component): type = 0x09
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.is_component_broadcast: if broadcast.component_data[0] == 0x01: return Serial.DataReceivedEvent(broadcast) else: raise errors.CaspiaError('Invalid broadcast') return None
def _chunks(self, l, n): for i in range(0, len(l), n): yield l[i:i + n] async def _on_new_data(self, func, timeout=None): """ func(data) should return True to stop the subscription """ def accept(event): if isinstance(event, Serial.DataReceivedEvent): return func(event.data) return None await utils.receive_event(accept, self.node.client, timeout=timeout)
[docs] async def set_baudrate(self, baudrate): assert baudrate % 100 == 0, 'baudrate must be divisable by 100' await self.request(struct.pack('<BH', 2, baudrate // 100))
[docs] async def write(self, data): for chunk in self._chunks(data, 14): await self.request(b'\x01' + chunk)
[docs] async def read(self, length=None, timeout=None): if length is None and timeout is None: return self.state.read() else: data = self.state.read(length) if len(data) == length: return data def new_data_received(_): nonlocal data if length is not None: data += self.state.read(length - len(data)) else: data += self.state.read() return len(data) == length try: await self._on_new_data(new_data_received, timeout=timeout) except asyncio.TimeoutError: pass return data
[docs] @dataclass class DataReceivedEvent(BroadcastEvent): data: bytes = field(init=False) def __post_init__(self): super().__post_init__() self.data = self.broadcast.component_data[1:]
[docs] class State(Component.State): def __init__(self): self.received_data = bytes()
[docs] def update_from_bytes(self, data): pass
[docs] def update_from_event(self, event): if isinstance(event, Serial.DataReceivedEvent): self.received_data += event.data
[docs] def read(self, max_length=None): if max_length is None: max_length = len(self.received_data) data, self.received_data = \ self.received_data[:max_length], self.received_data[max_length:] return data
def __repr_fields__(self): return dict(pending_received_data=self.received_data.hex())
[docs] class Config(Component.Config): def __init__(self, baudrate=None): self.baudrate = baudrate if baudrate is not None else 9600
[docs] def get_bytes(self): return struct.pack('<H', self.baudrate // 100)
[docs] @classmethod def from_bytes(cls, data): return cls(baudrate=struct.unpack('<H', data)[0] * 100)
def __repr_fields__(self): return dict(baudrate=self.baudrate)