import struct
from dataclasses import dataclass, field
from caspia.node import pin
from caspia.node.events import BroadcastEvent
from .base import Component
[docs]class Blinds(Component):
NO_MOVEMENT = 0x00
MOVEMENT_UP = 0x01
MOVEMENT_DOWN = 0x02
INVALID_TILT = 0x7FFFFFFF
INVALID_BLIND = 0x7FFFFFFF
type = 0x05
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.component_data[0] == 0x00:
return Blinds.StateUpdateEvent(broadcast)
return None
[docs] @Component.does_request(lambda: b'\x01')
async def move_up(self, response):
pass
[docs] @Component.does_request(lambda: b'\x02')
async def move_down(self, response):
pass
[docs] @Component.does_request(lambda: b'\x03')
async def stop_movement(self, response):
pass
[docs] @Component.does_request(lambda: b'\x04')
async def calibrate(self, response):
pass
[docs] async def set_target_position(self, blind, tilt, relative=True):
if relative:
blind = await self.blind_to_absolute(blind)
tilt = await self.tilt_to_absolute(tilt)
print(blind, tilt)
data = struct.pack('<Bii', 0x09, int(blind), int(tilt))
await self.request(data)
[docs] async def set_target_tilt(self, tilt, relative=True):
if relative:
tilt = await self.tilt_to_absolute(tilt)
data = struct.pack('<Bi', 0x05, int(tilt))
await self.request(data)
[docs] async def set_target_blind(self, blind, relative=True):
if relative:
blind = await self.blind_to_absolute(blind)
data = struct.pack('<Bi', 0x06, int(blind))
await self.request(data)
[docs] async def load_timing(self):
""" Loads max_tilt and max_blind from the node (expressed in ms) """
data = await self.request(b'\x07')
self._timing_cached = [ms / 1000 for ms in struct.unpack('<ii', data)]
return self._timing_cached
[docs] async def set_internal_state(self, blind, tilt, backlash=0, calibrated=True, relative=True):
""" Set the internal state of blinds (backlash is always in seconds) """
if relative:
blind = await self.blind_to_absolute(blind)
tilt = await self.tilt_to_absolute(tilt)
backlash = int(backlash * 1000)
data = struct.pack('<BIIIB', 0x08, blind, tilt, backlash, int(calibrated))
await self.request(data)
[docs] async def tilt_to_absolute(self, tilt):
""" Convert tilt from 0..1 to ms """
max_tilt, _ = await self.get_timing()
return int(max_tilt * tilt * 1000)
[docs] async def tilt_to_relative(self, tilt):
""" Convert tilt from ms to 0..1"""
max_tilt, _ = await self.get_timing()
result = tilt / (max_tilt * 1000)
return min(0.0, max(1.0, result))
[docs] async def blind_to_absolute(self, blind):
""" Convert blind from 0..1 to ms """
_, max_blind = await self.get_timing()
return int(max_blind * blind * 1000)
[docs] async def blind_to_relative(self, blind):
""" Convert blind from ms to 0..1"""
_, max_blind = await self.get_timing()
result = blind / (max_blind * 1000)
return min(0.0, max(1.0, result))
[docs] async def get_timing(self, allow_cached=True):
""" Returns (maximum_tilt, maximum_blind) expressed in milliseconds """
if self.config:
max_blind = self.config.move_down_time - self.config.backlash - self.config.tilt_time
max_tilt = self.config.tilt_time
return max_tilt, max_blind
elif hasattr(self, '_timing_cached') and allow_cached:
return self._timing_cached
else:
return await self.load_timing()
[docs] @dataclass
class StateUpdateEvent(BroadcastEvent, Component.Event):
movement: int = field(init=False)
tilt: int = field(init=False)
blind: int = field(init=False)
def __post_init__(self):
super().__post_init__()
self.movement, tilt, blind = struct.unpack('<BhH', self.broadcast.component_data[1:])
self.tilt = tilt * 10
self.blind = blind * 100
[docs] @dataclass
class State(Component.State):
movement: int = None
blind: int = None # in ms from top
tilt: int = None # in ms from horizontal (0) tilt
target_blind: int = None # INVALID_BLIND if not set
target_tilt: int = None # INVALID_TILT if not set
[docs] def update_from_bytes(self, data):
movement, blind, tilt, tblind, ttilt = struct.unpack('<Biiii', data)
self.movement = movement
self.blind = blind
self.tilt = tilt
self.target_blind = tblind
self.target_tilt = ttilt
[docs] def update_from_event(self, event):
if isinstance(event, Blinds.StateUpdateEvent):
self.movement = event.movement
self.blind = event.blind
self.tilt = event.tilt
[docs] @dataclass
class Config(Component.Config):
pwr_pin: pin.Pin
dir_pin: pin.Pin
move_up_delay: float
move_down_delay: float
direction_change_delay: float
tilt_time: float
move_up_time: float
move_down_time: float
arrest_time: float
backlash: int = 0.0
dir_pin_inverted: bool = False
calibrate_at_bottom: bool = False
def __post_init__(self):
if self.dir_pin_inverted is None:
self.dir_pin_inverted = False
if self.calibrate_at_bottom is None:
self.calibrate_at_bottom = False
[docs] def get_bytes(self):
times = [
self.move_up_delay, self.move_down_delay, self.direction_change_delay,
self.tilt_time, self.move_up_time, self.move_down_time, self.arrest_time,
self.backlash
]
args = [int(time * 1000) for time in times]
timing = struct.pack('<iiiiiiii', *args)
flags = (bool(self.dir_pin_inverted) << 0) | (bool(self.calibrate_at_bottom) << 1)
return bytes([self.pwr_pin.number, self.dir_pin.number, flags]) + timing
[docs] @classmethod
def from_bytes(cls, data):
timing = struct.unpack('<iiiiiiii', data[3:])
times = [ms / 1000 for ms in timing]
return cls(pwr_pin=pin.Pin(data[0]),
dir_pin=pin.Pin(data[1]),
move_up_delay=times[0],
move_down_delay=times[1],
direction_change_delay=times[2],
tilt_time=times[3],
move_up_time=times[4],
move_down_time=times[5],
arrest_time=times[6],
backlash=times[7],
dir_pin_inverted=bool(data[2] & (1 << 0)),
calibrate_at_bottom=bool(data[2] & (1 << 1)))