Source code for caspia.node.components.relay

import struct
from dataclasses import dataclass, field
from typing import List

from caspia.node import binary, config, errors, pollen_action
from caspia.node.events import BroadcastEvent
from caspia.node.pin import Pin

from .base import Component

GROUP_MAX = 0x3FF


[docs]class Relay(Component): type = 0x01
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.is_component_broadcast: if broadcast.component_data[0] == 0x01: return Relay.StateOnEvent(broadcast) elif broadcast.component_data[0] == 0x00: return Relay.StateOffEvent(broadcast) else: raise errors.CaspiaError('Invalid broadcast') else: if broadcast.group == 0x51: if broadcast.data[0] == 0x10: return Relay.GroupOffEvent(broadcast) elif broadcast.data[0] == 0x11: return Relay.GroupOnEvent(broadcast) elif broadcast.data[0] == 0x12: return Relay.GroupToggleEvent(broadcast) return None
[docs] @Component.does_request(lambda: b'\x10') async def off(self, response): self.state.update_from_bytes(response) return self.state
[docs] @Component.does_request(lambda: b'\x11') async def on(self, response): self.state.update_from_bytes(response) return self.state
[docs] @Component.does_request(lambda: b'\x12') async def toggle(self, response): self.state.update_from_bytes(response) return self.state
[docs] @staticmethod def group_request(command, group): return 0x51, struct.pack('<BH', command, group)
[docs] @staticmethod @pollen_action.broadcast(lambda group: Relay.group_request(0x10, group)) async def group_off(client, group): await client.broadcast(*Relay.group_request(0x10, group))
[docs] @staticmethod @pollen_action.broadcast(lambda group: Relay.group_request(0x11, group)) async def group_on(client, group): await client.broadcast(*Relay.group_request(0x11, group))
[docs] @staticmethod @pollen_action.broadcast(lambda group: Relay.group_request(0x12, group)) async def group_toggle(client, group): await client.broadcast(*Relay.group_request(0x12, group))
[docs] class StateOnEvent(BroadcastEvent, Component.Event): pass
[docs] class StateOffEvent(BroadcastEvent, Component.Event): pass
[docs] @dataclass class GroupOnEvent(BroadcastEvent): group: int = field(init=False) def __post_init__(self): super().__post_init__() self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] @dataclass class GroupOffEvent(BroadcastEvent): group: int = field(init=False) def __post_init__(self): super().__post_init__() self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] @dataclass class GroupToggleEvent(BroadcastEvent): group: int = field(init=False) def __post_init__(self): super().__post_init__() self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] class State(Component.State): def __init__(self, is_on=None): self.is_on = is_on
[docs] def update_from_bytes(self, data): self.is_on = data[0] == 1
[docs] def update_from_event(self, event): if isinstance(event, Relay.StateOnEvent): self.is_on = True elif isinstance(event, Relay.StateOffEvent): self.is_on = False
def __repr_fields__(self): if self.is_on is True: state = 'on' elif self.is_on is False: state = 'off' else: state = 'unknown' return state
[docs] @dataclass class Config(Component.Config): pin: Pin inverted: bool = False initial_state: bool = False timeout: float = 0.0 group_memberships: List[int] = field(default_factory=list)
[docs] def get_bytes(self): pinnum = self.pin.number flags = self.inverted | (self.initial_state << 1) timeout = int(self.timeout * 1000) data = struct.pack('<BBI', pinnum, flags, timeout) # pylint: disable=not-an-iterable groups = binary.encode_list([m.get_bytes() for m in self.group_memberships]) return data + groups
[docs] @classmethod def from_bytes(cls, data): pinnum, flags, timeout = struct.unpack('<BBI', data[0:6]) if len(data) == 6: groups = [] else: groups = [ Relay.GroupMembership.from_bytes(b) for b in binary.decode_list(data[6:])[0] ] return cls(Pin(pinnum), inverted=bool(flags & 1), initial_state=bool(flags & (1 << 1)), timeout=timeout / 1000, group_memberships=groups)
[docs] @dataclass class GroupMembership(config.Config): group: int is_master: bool def __post_init__(self): super().__post_init__() assert self.group <= GROUP_MAX
[docs] def get_bytes(self): return struct.pack('<H', self.group | (self.is_master << 15))
[docs] @classmethod def from_bytes(cls, data): raw_group, = struct.unpack('<H', data) return cls(group=raw_group & GROUP_MAX, is_master=bool(raw_group & (1 << 15)))