Source code for caspia.pan.rules.blindscontrol

import asyncio
import logging
import math

from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.rules import Rule
from caspia.meadow.services.blindscontrol import BlindsControlBase
from caspia.pan.reactive.sun import get_sun
from caspia.toolbox.managed_task import managed_task
from caspia.toolbox.storage import ProxyStorage, storage_property

from .base import RuleActivator, RuleState

logger = logging.getLogger(__name__)


[docs]class BlindsControlRule(Rule): def __init__(self, horizontal_view, vertical_view, blinds_seg_width, blinds_seg_spacing, blinds, sun=None, identifier=None): """ Identifier will become the BlindsControl's service name. Name of the rule will be filled automatically. """ super().__init__(identifier=identifier, name=f'BlindsControl {identifier}') self.horizontal_view = horizontal_view self.vertical_view = vertical_view self.blinds_seg_width = blinds_seg_width self.blinds_seg_spacing = blinds_seg_spacing self.sun = sun or get_sun() self.blinds = blinds
[docs]class BlindsControlRuleActivator(RuleActivator): def __init__(self, rule): super().__init__(rule) self.rule: BlindsControlRule self.service: 'BlindsControl' = None self.target_blind = None self.target_tilt = None self._periodic_task = None self._blinds_write_subscription = None
[docs] @classmethod def can_activate_rule(cls, rule): return isinstance(rule, BlindsControlRule)
[docs] async def do_activate(self): from caspia.pan import get_global_pan # Publish service storage = ProxyStorage(get_global_pan().storage, f'rule|{self.rule.identifier}|') service = BlindsControl(self.rule.identifier, storage=storage) service.mode.subscribe(self._on_mode_change) get_global_pan().gateway.add(service) self._blinds_write_subscription = \ get_global_pan().consumer_conn.listen_writes(self.rule.blinds.name, None, self._on_blinds_characteristic_write) self.service = service # Start periodic updates self._periodic_task = asyncio.ensure_future(self._run_periodically(60.0)) # Rule should be ready by now self.state = RuleState.READY, None
[docs] async def do_deactivate(self): from caspia.pan import get_global_pan # remove the published service get_global_pan().gateway.remove(self.service) self._blinds_write_subscription.unsubscribe() self.service = None # cancel periodic updates self._periodic_task.cancel() self._periodic_task = None # change state back to SETUP self.state = RuleState.SETUP, 'Rule was deactivated'
async def _run_periodically(self, interval): while True: self.do_control.schedule() # pylint: disable=no-member await asyncio.sleep(interval) async def _on_blinds_characteristic_write(self, value, extra, characteristic, **kwargs): blinds = self.rule.blinds target_chars = blinds.target_tilt.name, blinds.target_blind.name, blinds.move_up.name, \ blinds.move_down.name, blinds.stop_movement.name if characteristic in target_chars and extra.get('source') != self.service.name: await self.service.mode.write('manual') logger.info('Switching %s to manual mode because of external write', self.service.name) async def _on_mode_change(self, mode, **kwargs): self.do_control.schedule() # pylint: disable=no-member async def _set_blinds_position(self, blind, tilt): self.target_blind = blind self.target_tilt = tilt if blind is not None or tilt is not None: extra = {'source': self.service.name} await self.rule.blinds.target_blind.write(blind, extra=extra) await self.rule.blinds.target_tilt.write(tilt, extra=extra) @managed_task() async def do_control(self): with self.running_status(): mode = BlindsControl.Mode(await self.service.mode.read()) if mode == BlindsControl.Mode.MANUAL: await self._set_blinds_position(None, None) elif mode in [ BlindsControl.Mode.BLOCK_DIRECT_SUN_UP, BlindsControl.Mode.BLOCK_DIRECT_SUN_DOWN ]: await self._do_block_direct_sun(mode) async def _do_block_direct_sun(self, mode): sun_azimuth, sun_elevation = await asyncio.gather(self.rule.sun.azimuth.read(), self.rule.sun.elevation.read()) horizontal_range = AngleRange(*self.rule.horizontal_view) vertical_range = AngleRange(*self.rule.vertical_view) if sun_azimuth in horizontal_range and sun_elevation in vertical_range: # calculate needed tilt tilt_r = Calc.blinds_blocking_angle(self.rule.blinds_seg_width, self.rule.blinds_seg_spacing, math.radians(sun_elevation)) await self._set_blinds_position(1.0, math.degrees(tilt_r)) elif mode == BlindsControl.Mode.BLOCK_DIRECT_SUN_UP: await self._set_blinds_position(0.0, -90.0)
[docs]class BlindsControl(ServiceGatewayMixin, BlindsControlBase): _mode = storage_property('_mode', default=BlindsControlBase.Mode.MANUAL, serialize=lambda mode: mode.value, deserialize=BlindsControlBase.Mode) def __init__(self, name, storage): self.storage = storage super().__init__(name)
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.mode: self._mode = self.Mode(value) await self.notify(self.mode, self._mode.value, if_changed=True) else: return super().characteristic_write(characteristic, value, **kwargs)
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.mode: return self._mode.value else: return super().characteristic_read(characteristic, **kwargs)
[docs]class AngleRange: def __init__(self, start, end): self.start = start self.end = end def __contains__(self, angle): if self.start <= self.end: return self.start <= angle <= self.end else: return self.start <= angle <= 360.0 or 0.0 <= angle <= self.end
[docs]class Calc:
[docs] @staticmethod def triangle_c(a, b, gama): """ Calculate length of third side given two sides and angle between them. """ return math.sqrt(a**2 + b**2 - 2 * a * b * math.cos(gama))
[docs] @staticmethod def triangle_beta(a, b, c, gama): """ Calculates angle beta of a triangle with given edge lengths a and b and given angle gama (in radians). """ return math.acos((2 * (b**2) - 2 * a * b * math.cos(gama)) / (2 * b * c))
[docs] @staticmethod def blind_angle_(width, spacing, angle): beta = Calc.triangle_beta(spacing, width, Calc.triangle_c(spacing, width, math.radians(90 - angle)), math.radians(90 - angle)) return 90 - (180 - (90 - angle) - math.degrees(beta))
[docs] @staticmethod def blinds_current_view(width, spacing, angle): """ Given blind's width, spacing and current angle, calculate the view from inside out. :param width: width of the blind's segments :param spacing: vertical spacing of segments :param angle: current angle of the segments in radians :returns: range of vertical view (tuple of two angles) """ def calc(a): beta = Calc.triangle_beta(a=spacing, b=width, c=Calc.triangle_c(spacing, width, math.pi / 2 - a), gama=math.pi / 2 - a) return math.pi / 2 - (math.pi - (math.pi / 2 - a) - beta) return calc(angle), calc(math.pi - angle)
[docs] @staticmethod def blinds_blocking_angle(width, spacing, angle): """ Given a current position of a sun and size of blind's segments, calculate the minimum blind's angle blocking all sunlight. """ xi = math.pi / 2 - angle beta = math.asin((spacing * math.sin(xi)) / width) return -angle + beta