from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler, characteristic_write_handler
from caspia.meadow.services import BlindsBase
from caspia.node import components
[docs]class Blinds(GatewayService, BlindsBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.blinds_c: components.Blinds = None
self.angle_mapping = None
@property
def dependant_components(self):
return {self.blinds_c}
[docs] def movement_to_str(self, movement):
if movement == components.Blinds.MOVEMENT_UP:
return 'up'
elif movement == components.Blinds.MOVEMENT_DOWN:
return 'down'
else:
return 'steady'
def _relative_to_angle(self, relative):
if self.angle_mapping:
mapping = sorted(self.angle_mapping.items(), key=lambda i: i[0])
angle_diff = mapping[1][1] - mapping[0][1]
key_diff = mapping[1][0] - mapping[0][0]
return ((relative - mapping[0][0]) / key_diff) * angle_diff + mapping[0][1]
else:
return relative
def _angle_to_relative(self, angle):
if self.angle_mapping:
mapping = sorted(self.angle_mapping.items(), key=lambda i: i[0])
angle_diff = mapping[1][1] - mapping[0][1]
key_diff = mapping[1][0] - mapping[0][0]
return ((angle - mapping[0][1]) / angle_diff) * key_diff + mapping[0][0]
else:
return angle
[docs] async def on_component_event(self, component, event):
if component == self.blinds_c and isinstance(event, components.Blinds.StateUpdateEvent):
blind = await self.blinds_c.blind_to_relative(self.blinds_c.state.blind)
relative_tilt = await self.blinds_c.tilt_to_relative(self.blinds_c.state.tilt)
tilt = self._relative_to_angle(relative_tilt)
await self.notify(self.blind, blind, if_changed=True)
await self.notify(self.tilt, tilt, if_changed=True)
await self.notify(self.movement,
self.movement_to_str(self.blinds_c.state.movement),
if_changed=True)
[docs] @characteristic_read_handler('movement')
async def movement_read(self, **kwargs):
if self.blinds_c.state.movement is None:
await self.blinds_c.load_state()
return self.movement_to_str(self.blinds_c.state.movement)
[docs] @characteristic_write_handler('move_up')
async def move_up_write(self, value, **kwargs):
await self.blinds_c.move_up()
[docs] @characteristic_write_handler('move_down')
async def move_down_write(self, value, **kwargs):
await self.blinds_c.move_down()
[docs] @characteristic_write_handler('stop_movement')
async def stop_movement_write(self, value, **kwargs):
await self.blinds_c.stop_movement()
[docs] @characteristic_write_handler('calibrate')
async def calibrate_write(self, value, **kwargs):
await self.blinds_c.calibrate()
[docs] @characteristic_read_handler('target_position')
async def target_position_read(self, **kwargs):
return [
await self.characteristic_read(self.blind), await self.characteristic_read(self.tilt)
]
[docs] @characteristic_write_handler('target_position')
async def target_position_write(self, value, **kwargs):
blind, angle = value
relative_tilt = self._angle_to_relative(angle)
await self.blinds_c.set_target_position(blind, relative_tilt)
[docs] @characteristic_read_handler('blind')
async def blind_read(self, **kwargs):
if self.blinds_c.state.blind is None:
await self.blinds_c.load_state()
if self.blinds_c.state.blind == components.Blinds.INVALID_BLIND:
return 0.0
else:
return await self.blinds_c.blind_to_relative(self.blinds_c.state.blind)
[docs] @characteristic_read_handler('target_blind')
async def target_blind_read(self, **kwargs):
if self.blinds_c.state.target_blind is None:
await self.blinds_c.load_state()
if self.blinds_c.state.target_blind == components.Blinds.INVALID_BLIND:
return await self.characteristic_read(self.blind, **kwargs)
else:
return await self.blinds_c.blind_to_relative(self.blinds_c.state.target_blind)
[docs] @characteristic_write_handler('target_blind')
async def target_blind_write(self, value, **kwargs):
await self.blinds_c.set_target_blind(value)
[docs] @characteristic_read_handler('tilt')
async def tilt_read(self, **kwargs):
if self.blinds_c.state.tilt is None:
await self.blinds_c.load_state()
if self.blinds_c.state.tilt == components.Blinds.INVALID_TILT:
return 0.0
else:
relative_tilt = await self.blinds_c.tilt_to_relative(self.blinds_c.state.tilt)
return self._relative_to_angle(relative_tilt)
[docs] @characteristic_read_handler('target_tilt')
async def target_tilt_read(self, **kwargs):
if self.blinds_c.state.target_tilt is None:
await self.blinds_c.load_state()
if self.blinds_c.state.target_tilt == components.Blinds.INVALID_TILT:
return await self.characteristic_read(self.tilt, **kwargs)
else:
return self._relative_to_angle(await self.blinds_c.tilt_to_relative(
self.blinds_c.state.target_tilt))
[docs] @characteristic_write_handler('target_tilt')
async def target_tilt_write(self, value, **kwargs):
relative_tilt = self._angle_to_relative(value)
await self.blinds_c.set_target_tilt(relative_tilt)