Source code for caspia.node.components.system

import asyncio
import enum
import struct
from dataclasses import dataclass, field
from typing import ClassVar

from caspia.node import binary, errors, utils
from caspia.node.events import BroadcastEvent

from .base import Component


[docs]class System(Component): type = 0x00
[docs] @classmethod def parse_broadcast(cls, broadcast): data = broadcast.component_data if len(data) and data[0] == 0x01: return System.HeartbeatEvent(broadcast) else: raise ValueError('Invalid broadcast')
[docs] async def read_hwid(self): return await self.request(b'\x04')
[docs] async def read_buildinfo(self): data = await self.request(b'\x17') return System.BuildInfo(data)
[docs] async def config_reset(self): """ Reset config's buffer to the content of FLASH. """ await self.request(b'\x13')
[docs] async def config_clear(self): """ Set config's buffer to all zeros. """ await self.request(b'\x14')
[docs] async def config_hash_read(self) -> int: """ Read hash of current config's buffer. """ data = await self.request(b'\x10') return struct.unpack('<I', data)[0]
[docs] async def config_write(self, data): """ Write `data` to node's config buffer. """ chunk_size = 16 for offset in range(0, len(data), chunk_size): chunk = data[offset:offset + chunk_size] prefix = struct.pack('<BH', 0x11, offset) await self.request(prefix + chunk, timeout=10)
[docs] async def config_flash(self): """ Write the config buffer to FLASH. """ await self.request(b'\x12')
[docs] async def config_current_length(self) -> int: """ Get length of current configuration. """ response = await self.request(b'\x15') return struct.unpack('<H', response)[0]
[docs] async def config_current_read_chunk(self, offset, length): """ Read part of current config's data. """ return await self.request(b'\x16' + struct.pack('<HH', offset, length))
[docs] async def config_current_read(self): """ Read data of current config. """ length = await self.config_current_length() return await self.config_current_read_chunk(0, length)
[docs] async def restart(self): def accept_event(event): if isinstance(event, System.HeartbeatEvent): return True return None req = self.request(b'\x01') event = utils.receive_event(accept_event, self.node.client) _, pending = await asyncio.wait([event, req], return_when=asyncio.FIRST_COMPLETED) for fut in pending: fut.cancel()
[docs] async def invoke_isp(self): await self.request(b'\x03')
[docs] async def ping(self): await self.request(b'\x05')
[docs] async def trigger_wdt_reset(self): await self.request(b'\xF0')
[docs] @dataclass class HeartbeatEvent(BroadcastEvent, Component.Event): after_restart: bool = field(init=False) online: bool = field(init=False) reset_reason: 'System.ResetReason' = field(init=False) def __post_init__(self): super().__post_init__() data = self.broadcast.component_data if len(data) != 2: raise ValueError('Invalid heartbeat broadcast') self.after_restart = bool(data[1] & 0x01) self.online = bool(data[1] & 0x02) self.reset_reason = System.ResetReason(0b111 & (data[1] >> 2))
[docs] class State(Component.State):
[docs] def update_from_bytes(self, data): pass
[docs] def update_from_event(self, event): pass
def __repr_fields__(self): return {}
[docs] @dataclass class Config(Component.Config): ENTRY_CAN_ID: ClassVar[int] = 0x00 ENTRY_METADATA: ClassVar[int] = 0x01 ENTRY_ONLINE_TIMEOUT: ClassVar[int] = 0x02 ENTRY_MOCK_I2C_HW: ClassVar[int] = 0x03 can_id: int metadata: dict = None online_timeout: int = None mock_i2c_hw: bool = False def __post_init__(self): super().__post_init__() if self.online_timeout is None: self.online_timeout = 30
[docs] def get_bytes(self): entries = { System.Config.ENTRY_CAN_ID: struct.pack('<H', self.can_id), System.Config.ENTRY_ONLINE_TIMEOUT: struct.pack('<H', self.online_timeout), System.Config.ENTRY_MOCK_I2C_HW: struct.pack('<B', self.mock_i2c_hw), } if self.metadata is not None: metadata_bytes = binary.encode_dictionary(self.metadata) entries[System.Config.ENTRY_METADATA] = metadata_bytes return binary.encode_dictionary(entries)
[docs] @classmethod def from_bytes(cls, data): entries, _ = binary.decode_dictionary(data) metadata, online_timeout, mock_i2c_hw = None, None, None if cls.ENTRY_CAN_ID not in entries: raise errors.InvalidConfigurationError can_id, = struct.unpack('<H', entries[cls.ENTRY_CAN_ID]) if cls.ENTRY_METADATA in entries: metadata = binary.decode_dictionary(entries[cls.ENTRY_METADATA]) if cls.ENTRY_ONLINE_TIMEOUT in entries: online_timeout, = struct.unpack('<H', entries[cls.ENTRY_ONLINE_TIMEOUT]) if cls.ENTRY_MOCK_I2C_HW in entries: mock_i2c_hw = bool(entries[cls.ENTRY_MOCK_I2C_HW]) return cls(can_id=can_id, metadata=metadata, online_timeout=online_timeout, mock_i2c_hw=mock_i2c_hw)
[docs] class BuildInfo: def __init__(self, data): info, _ = binary.decode_dictionary(data) self.version = info[0x01].decode('utf-8') self.supported_component_types = list(info[0x02]) def __repr_fields__(self): types = [] for comp_type in self.supported_component_types: cls = Component.component_classes().get(comp_type) types.append(cls.__name__ if cls else comp_type) return { 'version': self.version, 'supported_component_types': types, }
[docs] class ResetReason(enum.Enum): UNKNOWN = 0x00 POWER_ON_RESET = 0x01 EXTERNAL = 0x02 WATCHDOG = 0x03 BROWNOUT = 0x04 SOFTWARE = 0x05