import struct
from dataclasses import dataclass, field
from caspia.node.events import BroadcastEvent
from .base import Component
[docs]class AnalogOutput(Component):
type = 0x0C
[docs] @Component.does_request(lambda: b'\x10')
async def off(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] @Component.does_request(lambda: b'\x11')
async def on(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] @Component.does_request(lambda: b'\x12')
async def toggle(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] async def set(self, value):
data = struct.pack('<BB', 0x20, AnalogOutput.encode_value(value))
response = await self.request(data)
self.state.update_from_bytes(response)
return self.state
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.component_data[0] == 0x00:
return AnalogOutput.StateOffEvent(broadcast)
elif broadcast.component_data[0] == 0x01:
return AnalogOutput.StateOnEvent(broadcast)
return None
[docs] @classmethod
def decode_value(cls, value):
return float((value & 0x7F) / 0x7F)
[docs] @classmethod
def encode_value(cls, value):
assert 0 <= value <= 1
return int(value * 0x7F)
[docs] @dataclass
class StateOnEvent(BroadcastEvent, Component.Event):
is_on: bool = field(init=False)
value: float = field(init=False)
def __post_init__(self):
super().__post_init__()
self.is_on = True
self.value = AnalogOutput.decode_value(self.broadcast.component_data[2])
[docs] @dataclass
class StateOffEvent(BroadcastEvent, Component.Event):
is_on: bool = field(init=False)
value: float = field(init=False)
def __post_init__(self):
super().__post_init__()
self.is_on = False
self.value = 0.0
[docs] @dataclass
class State(Component.State):
is_on: bool = None
value: float = None
error: int = None
[docs] def update_from_bytes(self, data):
is_on, value, error = struct.unpack('<BBB', data[0:3])
self.is_on = bool(is_on)
self.value = AnalogOutput.decode_value(value)
self.error = error
[docs] def update_from_event(self, event):
if isinstance(event, (AnalogOutput.StateOnEvent, AnalogOutput.StateOffEvent)):
self.is_on = event.is_on
self.value = event.value
[docs] @dataclass
class Config(Component.Config):
output_number: int
initial_state: float
[docs] def get_bytes(self):
return struct.pack('<BB', self.output_number,
AnalogOutput.encode_value(self.initial_state))
[docs] @classmethod
def from_bytes(cls, data):
output_number, initial_state = struct.unpack('<BB', data[0:2])
return cls(output_number=output_number, initial_state=initial_state)