import asyncio
import logging
import math
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.rules import Rule
from caspia.meadow.services.blindscontrol import BlindsControlBase
from caspia.pan.reactive.sun import get_sun
from caspia.toolbox.managed_task import managed_task
from caspia.toolbox.storage import ProxyStorage, storage_property
from .base import RuleActivator, RuleState
logger = logging.getLogger(__name__)
[docs]class BlindsControlRule(Rule):
def __init__(self,
horizontal_view,
vertical_view,
blinds_seg_width,
blinds_seg_spacing,
blinds,
sun=None,
identifier=None):
"""
Identifier will become the BlindsControl's service name.
Name of the rule will be filled automatically.
"""
super().__init__(identifier=identifier, name=f'BlindsControl {identifier}')
self.horizontal_view = horizontal_view
self.vertical_view = vertical_view
self.blinds_seg_width = blinds_seg_width
self.blinds_seg_spacing = blinds_seg_spacing
self.sun = sun or get_sun()
self.blinds = blinds
[docs]class BlindsControlRuleActivator(RuleActivator):
def __init__(self, rule):
super().__init__(rule)
self.rule: BlindsControlRule
self.service: 'BlindsControl' = None
self.target_blind = None
self.target_tilt = None
self._periodic_task = None
self._blinds_write_subscription = None
[docs] @classmethod
def can_activate_rule(cls, rule):
return isinstance(rule, BlindsControlRule)
[docs] async def do_activate(self):
from caspia.pan import get_global_pan
# Publish service
storage = ProxyStorage(get_global_pan().storage, f'rule|{self.rule.identifier}|')
service = BlindsControl(self.rule.identifier, storage=storage)
service.mode.subscribe(self._on_mode_change)
get_global_pan().gateway.add(service)
self._blinds_write_subscription = \
get_global_pan().consumer_conn.listen_writes(self.rule.blinds.name, None,
self._on_blinds_characteristic_write)
self.service = service
# Start periodic updates
self._periodic_task = asyncio.ensure_future(self._run_periodically(60.0))
# Rule should be ready by now
self.state = RuleState.READY, None
[docs] async def do_deactivate(self):
from caspia.pan import get_global_pan
# remove the published service
get_global_pan().gateway.remove(self.service)
self._blinds_write_subscription.unsubscribe()
self.service = None
# cancel periodic updates
self._periodic_task.cancel()
self._periodic_task = None
# change state back to SETUP
self.state = RuleState.SETUP, 'Rule was deactivated'
async def _run_periodically(self, interval):
while True:
self.do_control.schedule() # pylint: disable=no-member
await asyncio.sleep(interval)
async def _on_blinds_characteristic_write(self, value, extra, characteristic, **kwargs):
blinds = self.rule.blinds
target_chars = blinds.target_tilt.name, blinds.target_blind.name, blinds.move_up.name, \
blinds.move_down.name, blinds.stop_movement.name
if characteristic in target_chars and extra.get('source') != self.service.name:
await self.service.mode.write('manual')
logger.info('Switching %s to manual mode because of external write', self.service.name)
async def _on_mode_change(self, mode, **kwargs):
self.do_control.schedule() # pylint: disable=no-member
async def _set_blinds_position(self, blind, tilt):
self.target_blind = blind
self.target_tilt = tilt
if blind is not None or tilt is not None:
extra = {'source': self.service.name}
await self.rule.blinds.target_blind.write(blind, extra=extra)
await self.rule.blinds.target_tilt.write(tilt, extra=extra)
@managed_task()
async def do_control(self):
with self.running_status():
mode = BlindsControl.Mode(await self.service.mode.read())
if mode == BlindsControl.Mode.MANUAL:
await self._set_blinds_position(None, None)
elif mode in [
BlindsControl.Mode.BLOCK_DIRECT_SUN_UP, BlindsControl.Mode.BLOCK_DIRECT_SUN_DOWN
]:
await self._do_block_direct_sun(mode)
async def _do_block_direct_sun(self, mode):
sun_azimuth, sun_elevation = await asyncio.gather(self.rule.sun.azimuth.read(),
self.rule.sun.elevation.read())
horizontal_range = AngleRange(*self.rule.horizontal_view)
vertical_range = AngleRange(*self.rule.vertical_view)
if sun_azimuth in horizontal_range and sun_elevation in vertical_range:
# calculate needed tilt
tilt_r = Calc.blinds_blocking_angle(self.rule.blinds_seg_width,
self.rule.blinds_seg_spacing,
math.radians(sun_elevation))
await self._set_blinds_position(1.0, math.degrees(tilt_r))
elif mode == BlindsControl.Mode.BLOCK_DIRECT_SUN_UP:
await self._set_blinds_position(0.0, -90.0)
[docs]class BlindsControl(ServiceGatewayMixin, BlindsControlBase):
_mode = storage_property('_mode',
default=BlindsControlBase.Mode.MANUAL,
serialize=lambda mode: mode.value,
deserialize=BlindsControlBase.Mode)
def __init__(self, name, storage):
self.storage = storage
super().__init__(name)
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic is self.mode:
self._mode = self.Mode(value)
await self.notify(self.mode, self._mode.value, if_changed=True)
else:
return super().characteristic_write(characteristic, value, **kwargs)
[docs] async def characteristic_read(self, characteristic, **kwargs):
if characteristic is self.mode:
return self._mode.value
else:
return super().characteristic_read(characteristic, **kwargs)
[docs]class AngleRange:
def __init__(self, start, end):
self.start = start
self.end = end
def __contains__(self, angle):
if self.start <= self.end:
return self.start <= angle <= self.end
else:
return self.start <= angle <= 360.0 or 0.0 <= angle <= self.end
[docs]class Calc:
[docs] @staticmethod
def triangle_c(a, b, gama):
""" Calculate length of third side given two sides and angle between them. """
return math.sqrt(a**2 + b**2 - 2 * a * b * math.cos(gama))
[docs] @staticmethod
def triangle_beta(a, b, c, gama):
"""
Calculates angle beta of a triangle with given edge lengths a and b
and given angle gama (in radians).
"""
return math.acos((2 * (b**2) - 2 * a * b * math.cos(gama)) / (2 * b * c))
[docs] @staticmethod
def blind_angle_(width, spacing, angle):
beta = Calc.triangle_beta(spacing, width,
Calc.triangle_c(spacing, width, math.radians(90 - angle)),
math.radians(90 - angle))
return 90 - (180 - (90 - angle) - math.degrees(beta))
[docs] @staticmethod
def blinds_current_view(width, spacing, angle):
"""
Given blind's width, spacing and current angle,
calculate the view from inside out.
:param width: width of the blind's segments
:param spacing: vertical spacing of segments
:param angle: current angle of the segments in radians
:returns: range of vertical view (tuple of two angles)
"""
def calc(a):
beta = Calc.triangle_beta(a=spacing,
b=width,
c=Calc.triangle_c(spacing, width, math.pi / 2 - a),
gama=math.pi / 2 - a)
return math.pi / 2 - (math.pi - (math.pi / 2 - a) - beta)
return calc(angle), calc(math.pi - angle)
[docs] @staticmethod
def blinds_blocking_angle(width, spacing, angle):
"""
Given a current position of a sun and size of blind's segments,
calculate the minimum blind's angle blocking all sunlight.
"""
xi = math.pi / 2 - angle
beta = math.asin((spacing * math.sin(xi)) / width)
return -angle + beta