Source code for caspia.node.components.blinds

import struct
from dataclasses import dataclass, field

from caspia.node import pin
from caspia.node.events import BroadcastEvent

from .base import Component


[docs]class Blinds(Component): NO_MOVEMENT = 0x00 MOVEMENT_UP = 0x01 MOVEMENT_DOWN = 0x02 INVALID_TILT = 0x7FFFFFFF INVALID_BLIND = 0x7FFFFFFF type = 0x05
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.component_data[0] == 0x00: return Blinds.StateUpdateEvent(broadcast) return None
[docs] @Component.does_request(lambda: b'\x01') async def move_up(self, response): pass
[docs] @Component.does_request(lambda: b'\x02') async def move_down(self, response): pass
[docs] @Component.does_request(lambda: b'\x03') async def stop_movement(self, response): pass
[docs] @Component.does_request(lambda: b'\x04') async def calibrate(self, response): pass
[docs] async def set_target_position(self, blind, tilt, relative=True): if relative: blind = await self.blind_to_absolute(blind) tilt = await self.tilt_to_absolute(tilt) print(blind, tilt) data = struct.pack('<Bii', 0x09, int(blind), int(tilt)) await self.request(data)
[docs] async def set_target_tilt(self, tilt, relative=True): if relative: tilt = await self.tilt_to_absolute(tilt) data = struct.pack('<Bi', 0x05, int(tilt)) await self.request(data)
[docs] async def set_target_blind(self, blind, relative=True): if relative: blind = await self.blind_to_absolute(blind) data = struct.pack('<Bi', 0x06, int(blind)) await self.request(data)
[docs] async def load_timing(self): """ Loads max_tilt and max_blind from the node (expressed in ms) """ data = await self.request(b'\x07') self._timing_cached = [ms / 1000 for ms in struct.unpack('<ii', data)] return self._timing_cached
[docs] async def set_internal_state(self, blind, tilt, backlash=0, calibrated=True, relative=True): """ Set the internal state of blinds (backlash is always in seconds) """ if relative: blind = await self.blind_to_absolute(blind) tilt = await self.tilt_to_absolute(tilt) backlash = int(backlash * 1000) data = struct.pack('<BIIIB', 0x08, blind, tilt, backlash, int(calibrated)) await self.request(data)
[docs] async def tilt_to_absolute(self, tilt): """ Convert tilt from 0..1 to ms """ max_tilt, _ = await self.get_timing() return int(max_tilt * tilt * 1000)
[docs] async def tilt_to_relative(self, tilt): """ Convert tilt from ms to 0..1""" max_tilt, _ = await self.get_timing() result = tilt / (max_tilt * 1000) return min(0.0, max(1.0, result))
[docs] async def blind_to_absolute(self, blind): """ Convert blind from 0..1 to ms """ _, max_blind = await self.get_timing() return int(max_blind * blind * 1000)
[docs] async def blind_to_relative(self, blind): """ Convert blind from ms to 0..1""" _, max_blind = await self.get_timing() result = blind / (max_blind * 1000) return min(0.0, max(1.0, result))
[docs] async def get_timing(self, allow_cached=True): """ Returns (maximum_tilt, maximum_blind) expressed in milliseconds """ if self.config: max_blind = self.config.move_down_time - self.config.backlash - self.config.tilt_time max_tilt = self.config.tilt_time return max_tilt, max_blind elif hasattr(self, '_timing_cached') and allow_cached: return self._timing_cached else: return await self.load_timing()
[docs] @dataclass class StateUpdateEvent(BroadcastEvent, Component.Event): movement: int = field(init=False) tilt: int = field(init=False) blind: int = field(init=False) def __post_init__(self): super().__post_init__() self.movement, tilt, blind = struct.unpack('<BhH', self.broadcast.component_data[1:]) self.tilt = tilt * 10 self.blind = blind * 100
[docs] @dataclass class State(Component.State): movement: int = None blind: int = None # in ms from top tilt: int = None # in ms from horizontal (0) tilt target_blind: int = None # INVALID_BLIND if not set target_tilt: int = None # INVALID_TILT if not set
[docs] def update_from_bytes(self, data): movement, blind, tilt, tblind, ttilt = struct.unpack('<Biiii', data) self.movement = movement self.blind = blind self.tilt = tilt self.target_blind = tblind self.target_tilt = ttilt
[docs] def update_from_event(self, event): if isinstance(event, Blinds.StateUpdateEvent): self.movement = event.movement self.blind = event.blind self.tilt = event.tilt
[docs] @dataclass class Config(Component.Config): pwr_pin: pin.Pin dir_pin: pin.Pin move_up_delay: float move_down_delay: float direction_change_delay: float tilt_time: float move_up_time: float move_down_time: float arrest_time: float backlash: int = 0.0 dir_pin_inverted: bool = False calibrate_at_bottom: bool = False def __post_init__(self): if self.dir_pin_inverted is None: self.dir_pin_inverted = False if self.calibrate_at_bottom is None: self.calibrate_at_bottom = False
[docs] def get_bytes(self): times = [ self.move_up_delay, self.move_down_delay, self.direction_change_delay, self.tilt_time, self.move_up_time, self.move_down_time, self.arrest_time, self.backlash ] args = [int(time * 1000) for time in times] timing = struct.pack('<iiiiiiii', *args) flags = (bool(self.dir_pin_inverted) << 0) | (bool(self.calibrate_at_bottom) << 1) return bytes([self.pwr_pin.number, self.dir_pin.number, flags]) + timing
[docs] @classmethod def from_bytes(cls, data): timing = struct.unpack('<iiiiiiii', data[3:]) times = [ms / 1000 for ms in timing] return cls(pwr_pin=pin.Pin(data[0]), dir_pin=pin.Pin(data[1]), move_up_delay=times[0], move_down_delay=times[1], direction_change_delay=times[2], tilt_time=times[3], move_up_time=times[4], move_down_time=times[5], arrest_time=times[6], backlash=times[7], dir_pin_inverted=bool(data[2] & (1 << 0)), calibrate_at_bottom=bool(data[2] & (1 << 1)))