Source code for caspia.node.components.analogoutput

import struct
from dataclasses import dataclass, field

from caspia.node.events import BroadcastEvent

from .base import Component


[docs]class AnalogOutput(Component): type = 0x0C
[docs] @Component.does_request(lambda: b'\x10') async def off(self, response): self.state.update_from_bytes(response) return self.state
[docs] @Component.does_request(lambda: b'\x11') async def on(self, response): self.state.update_from_bytes(response) return self.state
[docs] @Component.does_request(lambda: b'\x12') async def toggle(self, response): self.state.update_from_bytes(response) return self.state
[docs] async def set(self, value): data = struct.pack('<BB', 0x20, AnalogOutput.encode_value(value)) response = await self.request(data) self.state.update_from_bytes(response) return self.state
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.component_data[0] == 0x00: return AnalogOutput.StateOffEvent(broadcast) elif broadcast.component_data[0] == 0x01: return AnalogOutput.StateOnEvent(broadcast) return None
[docs] @classmethod def decode_value(cls, value): return float((value & 0x7F) / 0x7F)
[docs] @classmethod def encode_value(cls, value): assert 0 <= value <= 1 return int(value * 0x7F)
[docs] @dataclass class StateOnEvent(BroadcastEvent, Component.Event): is_on: bool = field(init=False) value: float = field(init=False) def __post_init__(self): super().__post_init__() self.is_on = True self.value = AnalogOutput.decode_value(self.broadcast.component_data[2])
[docs] @dataclass class StateOffEvent(BroadcastEvent, Component.Event): is_on: bool = field(init=False) value: float = field(init=False) def __post_init__(self): super().__post_init__() self.is_on = False self.value = 0.0
[docs] @dataclass class State(Component.State): is_on: bool = None value: float = None error: int = None
[docs] def update_from_bytes(self, data): is_on, value, error = struct.unpack('<BBB', data[0:3]) self.is_on = bool(is_on) self.value = AnalogOutput.decode_value(value) self.error = error
[docs] def update_from_event(self, event): if isinstance(event, (AnalogOutput.StateOnEvent, AnalogOutput.StateOffEvent)): self.is_on = event.is_on self.value = event.value
[docs] @dataclass class Config(Component.Config): output_number: int initial_state: float
[docs] def get_bytes(self): return struct.pack('<BB', self.output_number, AnalogOutput.encode_value(self.initial_state))
[docs] @classmethod def from_bytes(cls, data): output_number, initial_state = struct.unpack('<BB', data[0:2]) return cls(output_number=output_number, initial_state=initial_state)