Source code for caspia.node.components.button
import struct
from dataclasses import dataclass, field
from typing import ClassVar, List
from caspia.node import Pin, binary, config, errors
from caspia.node.events import BroadcastEvent
from .base import Component
[docs]class Button(Component):
type = 0x02
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.component_data[0] == 0x00:
return Button.PushEvent(broadcast)
elif broadcast.component_data[0] == 0x01:
return Button.ReleaseEvent(broadcast)
elif broadcast.component_data[0] == 0x10:
return Button.ButtonEvent(broadcast)
else:
raise errors.CaspiaError('Invalid broadcast')
[docs] @dataclass
class ButtonEvent(BroadcastEvent, Component.Event):
"""Broadcasted on Hold, Click, etc ..."""
event: int = field(init=False)
interval: float = field(init=False)
EVENT_CLICK: ClassVar[int] = 0x01
EVENT_HOLD: ClassVar[int] = 0x02
[docs] @staticmethod
def create_raw_broadcast_data(component_id, evt_type, interval=None):
data = bytes([component_id, 0x10])
if evt_type == Button.ButtonEvent.EVENT_CLICK:
data += b'\x01'
elif evt_type == Button.ButtonEvent.EVENT_HOLD:
data += struct.pack('<BH', 0x02, int(interval * 1000))
else:
raise ValueError('Invalid evt_type.')
return data
def __post_init__(self):
super().__post_init__()
data = self.broadcast.component_data
self.event = data[1]
if self.event == Button.ButtonEvent.EVENT_HOLD:
self.interval = struct.unpack('<H', data[2:4])[0] / 1000
elif self.event == Button.ButtonEvent.EVENT_CLICK:
self.interval = 0
else:
raise ValueError('Invalid broadcast data')
[docs] @dataclass
class State(Component.State):
is_pushed: bool = None
[docs] def update_from_event(self, event):
if isinstance(event, Button.PushEvent):
self.is_pushed = True
elif isinstance(event, Button.ReleaseEvent):
self.is_pushed = False
[docs] @dataclass
class Config(Component.Config):
pin: Pin
events: List['EventConfig'] = field(default_factory=list)
[docs] def get_bytes(self):
header = bytes([self.pin.number])
sorted_events = sorted(self.events,
key=lambda evt: evt.encode_trigger(evt.trigger),
reverse=True)
events_data = binary.encode_list([evt.get_bytes() for evt in sorted_events])
return header + events_data
[docs] @classmethod
def from_bytes(cls, data):
event_entries, _ = binary.decode_list(data[1:])
events = [Button.EventConfig.from_bytes(data) for data in event_entries]
return cls(pin=Pin(data[0]), events=events)
[docs] @dataclass
class EventConfig(config.Config):
TRIGGER_PUSH: ClassVar[int] = 0xFFFE
TRIGGER_RELEASE: ClassVar[int] = 0xFFFF
TYPE_REQUEST: ClassVar[int] = 0x00
TYPE_BROADCAST: ClassVar[int] = 0x01
trigger: int
evt_type: int
addr: int
data: bytes
canceling: bool = True
when_online: bool = True
when_offline: bool = True
[docs] @classmethod
def encode_trigger(cls, trigger):
if trigger in [cls.TRIGGER_PUSH, cls.TRIGGER_RELEASE]:
return trigger
else:
return int(trigger * 1000)
[docs] @classmethod
def decode_trigger(cls, raw):
if raw in [cls.TRIGGER_PUSH, cls.TRIGGER_RELEASE]:
return raw
else:
return raw / 1000
[docs] def get_bytes(self):
flags = (self.when_online << 7 | self.when_offline << 6 | self.canceling << 5
| self.evt_type)
header = struct.pack('<HBHB', Button.EventConfig.encode_trigger(self.trigger), flags,
self.addr, len(self.data))
return header + self.data
[docs] @classmethod
def from_bytes(cls, data):
trigger_raw, flags, addr, _ = struct.unpack('<HBHB', data[:6])
return Button.EventConfig(trigger=Button.EventConfig.decode_trigger(trigger_raw),
evt_type=flags & 3,
addr=addr,
data=data[6:],
canceling=flags & (1 << 5),
when_online=flags & (1 << 7),
when_offline=flags & (1 << 6))