import asyncio
import enum
import struct
from dataclasses import dataclass, field
from typing import ClassVar
from caspia.node import binary, errors, utils
from caspia.node.events import BroadcastEvent
from .base import Component
[docs]class System(Component):
type = 0x00
[docs] @classmethod
def parse_broadcast(cls, broadcast):
data = broadcast.component_data
if len(data) and data[0] == 0x01:
return System.HeartbeatEvent(broadcast)
else:
raise ValueError('Invalid broadcast')
[docs] async def read_hwid(self):
return await self.request(b'\x04')
[docs] async def read_buildinfo(self):
data = await self.request(b'\x17')
return System.BuildInfo(data)
[docs] async def config_reset(self):
""" Reset config's buffer to the content of FLASH. """
await self.request(b'\x13')
[docs] async def config_clear(self):
""" Set config's buffer to all zeros. """
await self.request(b'\x14')
[docs] async def config_hash_read(self) -> int:
""" Read hash of current config's buffer. """
data = await self.request(b'\x10')
return struct.unpack('<I', data)[0]
[docs] async def config_write(self, data):
""" Write `data` to node's config buffer. """
chunk_size = 16
for offset in range(0, len(data), chunk_size):
chunk = data[offset:offset + chunk_size]
prefix = struct.pack('<BH', 0x11, offset)
await self.request(prefix + chunk, timeout=10)
[docs] async def config_flash(self):
""" Write the config buffer to FLASH. """
await self.request(b'\x12')
[docs] async def config_current_length(self) -> int:
""" Get length of current configuration. """
response = await self.request(b'\x15')
return struct.unpack('<H', response)[0]
[docs] async def config_current_read_chunk(self, offset, length):
""" Read part of current config's data. """
return await self.request(b'\x16' + struct.pack('<HH', offset, length))
[docs] async def config_current_read(self):
""" Read data of current config. """
length = await self.config_current_length()
return await self.config_current_read_chunk(0, length)
[docs] async def restart(self):
def accept_event(event):
if isinstance(event, System.HeartbeatEvent):
return True
return None
req = self.request(b'\x01')
event = utils.receive_event(accept_event, self.node.client)
_, pending = await asyncio.wait([event, req], return_when=asyncio.FIRST_COMPLETED)
for fut in pending:
fut.cancel()
[docs] async def invoke_isp(self):
await self.request(b'\x03')
[docs] async def ping(self):
await self.request(b'\x05')
[docs] async def trigger_wdt_reset(self):
await self.request(b'\xF0')
[docs] @dataclass
class HeartbeatEvent(BroadcastEvent, Component.Event):
after_restart: bool = field(init=False)
online: bool = field(init=False)
reset_reason: 'System.ResetReason' = field(init=False)
def __post_init__(self):
super().__post_init__()
data = self.broadcast.component_data
if len(data) != 2:
raise ValueError('Invalid heartbeat broadcast')
self.after_restart = bool(data[1] & 0x01)
self.online = bool(data[1] & 0x02)
self.reset_reason = System.ResetReason(0b111 & (data[1] >> 2))
[docs] class State(Component.State):
[docs] def update_from_bytes(self, data):
pass
[docs] def update_from_event(self, event):
pass
def __repr_fields__(self):
return {}
[docs] @dataclass
class Config(Component.Config):
ENTRY_CAN_ID: ClassVar[int] = 0x00
ENTRY_METADATA: ClassVar[int] = 0x01
ENTRY_ONLINE_TIMEOUT: ClassVar[int] = 0x02
ENTRY_MOCK_I2C_HW: ClassVar[int] = 0x03
can_id: int
metadata: dict = None
online_timeout: int = None
mock_i2c_hw: bool = False
def __post_init__(self):
super().__post_init__()
if self.online_timeout is None:
self.online_timeout = 30
[docs] def get_bytes(self):
entries = {
System.Config.ENTRY_CAN_ID: struct.pack('<H', self.can_id),
System.Config.ENTRY_ONLINE_TIMEOUT: struct.pack('<H', self.online_timeout),
System.Config.ENTRY_MOCK_I2C_HW: struct.pack('<B', self.mock_i2c_hw),
}
if self.metadata is not None:
metadata_bytes = binary.encode_dictionary(self.metadata)
entries[System.Config.ENTRY_METADATA] = metadata_bytes
return binary.encode_dictionary(entries)
[docs] @classmethod
def from_bytes(cls, data):
entries, _ = binary.decode_dictionary(data)
metadata, online_timeout, mock_i2c_hw = None, None, None
if cls.ENTRY_CAN_ID not in entries:
raise errors.InvalidConfigurationError
can_id, = struct.unpack('<H', entries[cls.ENTRY_CAN_ID])
if cls.ENTRY_METADATA in entries:
metadata = binary.decode_dictionary(entries[cls.ENTRY_METADATA])
if cls.ENTRY_ONLINE_TIMEOUT in entries:
online_timeout, = struct.unpack('<H', entries[cls.ENTRY_ONLINE_TIMEOUT])
if cls.ENTRY_MOCK_I2C_HW in entries:
mock_i2c_hw = bool(entries[cls.ENTRY_MOCK_I2C_HW])
return cls(can_id=can_id,
metadata=metadata,
online_timeout=online_timeout,
mock_i2c_hw=mock_i2c_hw)
[docs] class BuildInfo:
def __init__(self, data):
info, _ = binary.decode_dictionary(data)
self.version = info[0x01].decode('utf-8')
self.supported_component_types = list(info[0x02])
def __repr_fields__(self):
types = []
for comp_type in self.supported_component_types:
cls = Component.component_classes().get(comp_type)
types.append(cls.__name__ if cls else comp_type)
return {
'version': self.version,
'supported_component_types': types,
}
[docs] class ResetReason(enum.Enum):
UNKNOWN = 0x00
POWER_ON_RESET = 0x01
EXTERNAL = 0x02
WATCHDOG = 0x03
BROWNOUT = 0x04
SOFTWARE = 0x05