Source code for caspia.gateway.services.serial

from caspia.gateway.services import GatewayService
from caspia.meadow.services import SerialBase
from caspia.node import components


[docs]class Serial(GatewayService, SerialBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.serial_c = None
[docs] def configure(self): self.serial_c = self.network.get_configured_component(self.get_node(), components.Serial, self.config['serial'])
@property def dependant_components(self): return {self.serial_c} async def _inform_about_buffer(self): await self.notify(self.data_available, len(self.serial_c.state.received_data) > 0, if_changed=True)
[docs] async def on_component_event(self, component, event): if component is self.serial_c: if isinstance(event, components.Serial.DataReceivedEvent): await self._inform_about_buffer()
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.receive: data = self.serial_c.state.read() await self._inform_about_buffer() return data elif characteristic is self.data_available: return len(self.serial_c.state.received_data) > 0 else: return await super().characteristic_read(characteristic, **kwargs)
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.send: await self.serial_c.write(value) elif characteristic is self.baudrate: await self.serial_c.set_baudrate(value) else: return await super().characteristic_write(characteristic, value, **kwargs)