Source code for caspia.gateway.services.blinds

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler, characteristic_write_handler
from caspia.meadow.services import BlindsBase
from caspia.node import components


[docs]class Blinds(GatewayService, BlindsBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.blinds_c: components.Blinds = None self.angle_mapping = None
[docs] def configure(self): self.blinds_c = self.network.get_configured_component(self.get_node(), components.Blinds, self.config['component']) self.angle_mapping = self.config.get('angle_mapping')
@property def dependant_components(self): return {self.blinds_c}
[docs] def movement_to_str(self, movement): if movement == components.Blinds.MOVEMENT_UP: return 'up' elif movement == components.Blinds.MOVEMENT_DOWN: return 'down' else: return 'steady'
def _relative_to_angle(self, relative): if self.angle_mapping: mapping = sorted(self.angle_mapping.items(), key=lambda i: i[0]) angle_diff = mapping[1][1] - mapping[0][1] key_diff = mapping[1][0] - mapping[0][0] return ((relative - mapping[0][0]) / key_diff) * angle_diff + mapping[0][1] else: return relative def _angle_to_relative(self, angle): if self.angle_mapping: mapping = sorted(self.angle_mapping.items(), key=lambda i: i[0]) angle_diff = mapping[1][1] - mapping[0][1] key_diff = mapping[1][0] - mapping[0][0] return ((angle - mapping[0][1]) / angle_diff) * key_diff + mapping[0][0] else: return angle
[docs] async def on_component_event(self, component, event): if component == self.blinds_c and isinstance(event, components.Blinds.StateUpdateEvent): blind = await self.blinds_c.blind_to_relative(self.blinds_c.state.blind) relative_tilt = await self.blinds_c.tilt_to_relative(self.blinds_c.state.tilt) tilt = self._relative_to_angle(relative_tilt) await self.notify(self.blind, blind, if_changed=True) await self.notify(self.tilt, tilt, if_changed=True) await self.notify(self.movement, self.movement_to_str(self.blinds_c.state.movement), if_changed=True)
[docs] @characteristic_read_handler('movement') async def movement_read(self, **kwargs): if self.blinds_c.state.movement is None: await self.blinds_c.load_state() return self.movement_to_str(self.blinds_c.state.movement)
[docs] @characteristic_write_handler('move_up') async def move_up_write(self, value, **kwargs): await self.blinds_c.move_up()
[docs] @characteristic_write_handler('move_down') async def move_down_write(self, value, **kwargs): await self.blinds_c.move_down()
[docs] @characteristic_write_handler('stop_movement') async def stop_movement_write(self, value, **kwargs): await self.blinds_c.stop_movement()
[docs] @characteristic_write_handler('calibrate') async def calibrate_write(self, value, **kwargs): await self.blinds_c.calibrate()
[docs] @characteristic_read_handler('target_position') async def target_position_read(self, **kwargs): return [ await self.characteristic_read(self.blind), await self.characteristic_read(self.tilt) ]
[docs] @characteristic_write_handler('target_position') async def target_position_write(self, value, **kwargs): blind, angle = value relative_tilt = self._angle_to_relative(angle) await self.blinds_c.set_target_position(blind, relative_tilt)
[docs] @characteristic_read_handler('blind') async def blind_read(self, **kwargs): if self.blinds_c.state.blind is None: await self.blinds_c.load_state() if self.blinds_c.state.blind == components.Blinds.INVALID_BLIND: return 0.0 else: return await self.blinds_c.blind_to_relative(self.blinds_c.state.blind)
[docs] @characteristic_read_handler('target_blind') async def target_blind_read(self, **kwargs): if self.blinds_c.state.target_blind is None: await self.blinds_c.load_state() if self.blinds_c.state.target_blind == components.Blinds.INVALID_BLIND: return await self.characteristic_read(self.blind, **kwargs) else: return await self.blinds_c.blind_to_relative(self.blinds_c.state.target_blind)
[docs] @characteristic_write_handler('target_blind') async def target_blind_write(self, value, **kwargs): await self.blinds_c.set_target_blind(value)
[docs] @characteristic_read_handler('tilt') async def tilt_read(self, **kwargs): if self.blinds_c.state.tilt is None: await self.blinds_c.load_state() if self.blinds_c.state.tilt == components.Blinds.INVALID_TILT: return 0.0 else: relative_tilt = await self.blinds_c.tilt_to_relative(self.blinds_c.state.tilt) return self._relative_to_angle(relative_tilt)
[docs] @characteristic_read_handler('target_tilt') async def target_tilt_read(self, **kwargs): if self.blinds_c.state.target_tilt is None: await self.blinds_c.load_state() if self.blinds_c.state.target_tilt == components.Blinds.INVALID_TILT: return await self.characteristic_read(self.tilt, **kwargs) else: return self._relative_to_angle(await self.blinds_c.tilt_to_relative( self.blinds_c.state.target_tilt))
[docs] @characteristic_write_handler('target_tilt') async def target_tilt_write(self, value, **kwargs): relative_tilt = self._angle_to_relative(value) await self.blinds_c.set_target_tilt(relative_tilt)