import struct
from dataclasses import dataclass, field
from typing import List
from caspia.node import binary, config, errors, pollen_action
from caspia.node.events import BroadcastEvent
from caspia.node.pin import Pin
from .base import Component
GROUP_MAX = 0x3FF
[docs]class Relay(Component):
type = 0x01
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.is_component_broadcast:
if broadcast.component_data[0] == 0x01:
return Relay.StateOnEvent(broadcast)
elif broadcast.component_data[0] == 0x00:
return Relay.StateOffEvent(broadcast)
else:
raise errors.CaspiaError('Invalid broadcast')
else:
if broadcast.group == 0x51:
if broadcast.data[0] == 0x10:
return Relay.GroupOffEvent(broadcast)
elif broadcast.data[0] == 0x11:
return Relay.GroupOnEvent(broadcast)
elif broadcast.data[0] == 0x12:
return Relay.GroupToggleEvent(broadcast)
return None
[docs] @Component.does_request(lambda: b'\x10')
async def off(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] @Component.does_request(lambda: b'\x11')
async def on(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] @Component.does_request(lambda: b'\x12')
async def toggle(self, response):
self.state.update_from_bytes(response)
return self.state
[docs] @staticmethod
def group_request(command, group):
return 0x51, struct.pack('<BH', command, group)
[docs] @staticmethod
@pollen_action.broadcast(lambda group: Relay.group_request(0x10, group))
async def group_off(client, group):
await client.broadcast(*Relay.group_request(0x10, group))
[docs] @staticmethod
@pollen_action.broadcast(lambda group: Relay.group_request(0x11, group))
async def group_on(client, group):
await client.broadcast(*Relay.group_request(0x11, group))
[docs] @staticmethod
@pollen_action.broadcast(lambda group: Relay.group_request(0x12, group))
async def group_toggle(client, group):
await client.broadcast(*Relay.group_request(0x12, group))
[docs] class StateOnEvent(BroadcastEvent, Component.Event):
pass
[docs] class StateOffEvent(BroadcastEvent, Component.Event):
pass
[docs] @dataclass
class GroupOnEvent(BroadcastEvent):
group: int = field(init=False)
def __post_init__(self):
super().__post_init__()
self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] @dataclass
class GroupOffEvent(BroadcastEvent):
group: int = field(init=False)
def __post_init__(self):
super().__post_init__()
self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] @dataclass
class GroupToggleEvent(BroadcastEvent):
group: int = field(init=False)
def __post_init__(self):
super().__post_init__()
self.group, = struct.unpack('<H', self.broadcast.data[1:3])
[docs] class State(Component.State):
def __init__(self, is_on=None):
self.is_on = is_on
[docs] def update_from_bytes(self, data):
self.is_on = data[0] == 1
[docs] def update_from_event(self, event):
if isinstance(event, Relay.StateOnEvent):
self.is_on = True
elif isinstance(event, Relay.StateOffEvent):
self.is_on = False
def __repr_fields__(self):
if self.is_on is True:
state = 'on'
elif self.is_on is False:
state = 'off'
else:
state = 'unknown'
return state
[docs] @dataclass
class Config(Component.Config):
pin: Pin
inverted: bool = False
initial_state: bool = False
timeout: float = 0.0
group_memberships: List[int] = field(default_factory=list)
[docs] def get_bytes(self):
pinnum = self.pin.number
flags = self.inverted | (self.initial_state << 1)
timeout = int(self.timeout * 1000)
data = struct.pack('<BBI', pinnum, flags, timeout)
# pylint: disable=not-an-iterable
groups = binary.encode_list([m.get_bytes() for m in self.group_memberships])
return data + groups
[docs] @classmethod
def from_bytes(cls, data):
pinnum, flags, timeout = struct.unpack('<BBI', data[0:6])
if len(data) == 6:
groups = []
else:
groups = [
Relay.GroupMembership.from_bytes(b) for b in binary.decode_list(data[6:])[0]
]
return cls(Pin(pinnum),
inverted=bool(flags & 1),
initial_state=bool(flags & (1 << 1)),
timeout=timeout / 1000,
group_memberships=groups)
[docs] @dataclass
class GroupMembership(config.Config):
group: int
is_master: bool
def __post_init__(self):
super().__post_init__()
assert self.group <= GROUP_MAX
[docs] def get_bytes(self):
return struct.pack('<H', self.group | (self.is_master << 15))
[docs] @classmethod
def from_bytes(cls, data):
raw_group, = struct.unpack('<H', data)
return cls(group=raw_group & GROUP_MAX, is_master=bool(raw_group & (1 << 15)))