Source code for caspia.node.components.button

import struct
from dataclasses import dataclass, field
from typing import ClassVar, List

from caspia.node import Pin, binary, config, errors
from caspia.node.events import BroadcastEvent

from .base import Component


[docs]class Button(Component): type = 0x02
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.component_data[0] == 0x00: return Button.PushEvent(broadcast) elif broadcast.component_data[0] == 0x01: return Button.ReleaseEvent(broadcast) elif broadcast.component_data[0] == 0x10: return Button.ButtonEvent(broadcast) else: raise errors.CaspiaError('Invalid broadcast')
[docs] class PushEvent(BroadcastEvent, Component.Event): """Broadcasted on button push."""
[docs] class ReleaseEvent(BroadcastEvent, Component.Event): """Broadcasted on button release."""
[docs] @dataclass class ButtonEvent(BroadcastEvent, Component.Event): """Broadcasted on Hold, Click, etc ...""" event: int = field(init=False) interval: float = field(init=False) EVENT_CLICK: ClassVar[int] = 0x01 EVENT_HOLD: ClassVar[int] = 0x02
[docs] @staticmethod def create_raw_broadcast_data(component_id, evt_type, interval=None): data = bytes([component_id, 0x10]) if evt_type == Button.ButtonEvent.EVENT_CLICK: data += b'\x01' elif evt_type == Button.ButtonEvent.EVENT_HOLD: data += struct.pack('<BH', 0x02, int(interval * 1000)) else: raise ValueError('Invalid evt_type.') return data
def __post_init__(self): super().__post_init__() data = self.broadcast.component_data self.event = data[1] if self.event == Button.ButtonEvent.EVENT_HOLD: self.interval = struct.unpack('<H', data[2:4])[0] / 1000 elif self.event == Button.ButtonEvent.EVENT_CLICK: self.interval = 0 else: raise ValueError('Invalid broadcast data')
[docs] @dataclass class State(Component.State): is_pushed: bool = None
[docs] def update_from_bytes(self, data): pass
[docs] def update_from_event(self, event): if isinstance(event, Button.PushEvent): self.is_pushed = True elif isinstance(event, Button.ReleaseEvent): self.is_pushed = False
[docs] @dataclass class Config(Component.Config): pin: Pin events: List['EventConfig'] = field(default_factory=list)
[docs] def add_event(self, event): self.events.append(event)
[docs] def get_bytes(self): header = bytes([self.pin.number]) sorted_events = sorted(self.events, key=lambda evt: evt.encode_trigger(evt.trigger), reverse=True) events_data = binary.encode_list([evt.get_bytes() for evt in sorted_events]) return header + events_data
[docs] @classmethod def from_bytes(cls, data): event_entries, _ = binary.decode_list(data[1:]) events = [Button.EventConfig.from_bytes(data) for data in event_entries] return cls(pin=Pin(data[0]), events=events)
[docs] @dataclass class EventConfig(config.Config): TRIGGER_PUSH: ClassVar[int] = 0xFFFE TRIGGER_RELEASE: ClassVar[int] = 0xFFFF TYPE_REQUEST: ClassVar[int] = 0x00 TYPE_BROADCAST: ClassVar[int] = 0x01 trigger: int evt_type: int addr: int data: bytes canceling: bool = True when_online: bool = True when_offline: bool = True
[docs] @classmethod def encode_trigger(cls, trigger): if trigger in [cls.TRIGGER_PUSH, cls.TRIGGER_RELEASE]: return trigger else: return int(trigger * 1000)
[docs] @classmethod def decode_trigger(cls, raw): if raw in [cls.TRIGGER_PUSH, cls.TRIGGER_RELEASE]: return raw else: return raw / 1000
[docs] def get_bytes(self): flags = (self.when_online << 7 | self.when_offline << 6 | self.canceling << 5 | self.evt_type) header = struct.pack('<HBHB', Button.EventConfig.encode_trigger(self.trigger), flags, self.addr, len(self.data)) return header + self.data
[docs] @classmethod def from_bytes(cls, data): trigger_raw, flags, addr, _ = struct.unpack('<HBHB', data[:6]) return Button.EventConfig(trigger=Button.EventConfig.decode_trigger(trigger_raw), evt_type=flags & 3, addr=addr, data=data[6:], canceling=flags & (1 << 5), when_online=flags & (1 << 7), when_offline=flags & (1 << 6))