Source code for caspia.gateway.services.door

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import NotSupportedError
from caspia.meadow.services import DoorBase
from caspia.node import components


[docs]class Door(GatewayService, DoorBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.handle_input_c = None self.opened_input_c = None
[docs] def configure(self): # TODO: remove characteristics if 'handle_input' in self.config: self.handle_input_c = self.network.get_configured_component( self.get_node(), components.DigitalInput, self.config['handle_input']) if 'opened_input' in self.config: self.opened_input_c = self.network.get_configured_component( self.get_node(), components.DigitalInput, self.config['opened_input'])
@property def dependant_components(self): comps = set() if self.handle_input_c: comps.add(self.handle_input_c) if self.opened_input_c: comps.add(self.opened_input_c) return comps
[docs] async def on_component_event(self, component, event): if component == self.opened_input_c: await self.notify(self.is_open, bool(self.opened_input_c.state.value), if_changed=True) elif component == self.handle_input_c: await self.notify(self.is_handle_down, bool(self.handle_input_c.state.value), if_changed=True)
[docs] @characteristic_read_handler('is_open') async def is_open_read(self, **kwargs): if self.opened_input_c: if self.opened_input_c.state.value is None: await self.opened_input_c.load_state() return bool(self.opened_input_c.state.value) else: raise NotSupportedError('is_open is not supported')
[docs] @characteristic_read_handler('is_handle_down') async def is_handle_down_read(self, **kwargs): if self.handle_input_c: if self.handle_input_c.state.value is None: await self.handle_input_c.load_state() return bool(self.handle_input_c.state.value) else: raise NotSupportedError('is_handle_down is not supported')