Source code for caspia.node.components.digitalinput
import struct
from dataclasses import dataclass
from typing import ClassVar
import caspia.node
from caspia.node.events import BroadcastEvent
from caspia.node.pin import Pin
from .base import Component
[docs]class DigitalInput(Component):
type = 0x04
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.component_data[0] == 0x00:
return DigitalInput.LowToHighEvent(broadcast)
elif broadcast.component_data[0] == 0x01:
return DigitalInput.HighToLowEvent(broadcast)
else:
raise ValueError('Invalid broadcast')
[docs] class State(Component.State):
def __init__(self, value=None):
self.value = value
[docs] def update_from_bytes(self, data):
if data[0] == 0x00:
self.value = False
elif data[0] == 0x01:
self.value = True
else:
raise ValueError
[docs] def update_from_event(self, event):
if isinstance(event, DigitalInput.LowToHighEvent):
self.value = True
elif isinstance(event, DigitalInput.HighToLowEvent):
self.value = False
def __repr_fields__(self):
return dict(value=self.value)
[docs] @dataclass
class Config(Component.Config):
MODE_PULL_UP: ClassVar[int] = 0x01
MODE_PULL_DOWN: ClassVar[int] = 0x02
pin: Pin
mode: int = MODE_PULL_UP
sampling_freq: int = 100
samples_till_change: int = 3
broadcast_on_low_to_high: bool = True
broadcast_on_high_to_low: bool = True
inverted: bool = False
[docs] def get_bytes(self):
flags = self.broadcast_on_low_to_high << 0
flags |= self.broadcast_on_high_to_low << 1
flags |= self.inverted << 2
return struct.pack('<BBHBB', self.pin.number, self.mode, int(1000 / self.sampling_freq),
self.samples_till_change, flags)
[docs] @classmethod
def from_bytes(cls, data):
pin_num, mode, freq, samples_till_change, flags = struct.unpack('<BBHBB', data)
return cls(pin=caspia.node.Pin(pin_num),
mode=mode,
sampling_freq=1000 / freq,
samples_till_change=samples_till_change,
broadcast_on_low_to_high=bool(flags & 0x01),
broadcast_on_high_to_low=bool(flags & 0x02),
inverted=bool(flags & (1 << 2)))