Source code for caspia.node.utils

# pylint: disable=broad-except
import asyncio
import inspect
import logging
import struct

from caspia.node import components, errors
from caspia.node.broadcast import Broadcast
from caspia.node.components import Component

logger = logging.getLogger(__name__)


[docs]async def make_hwid_request(client, hwid, cmd): await client.broadcast(0x20, b'\x01' + hwid[0:7]) await asyncio.sleep(0.2) await client.broadcast(0x20, b'\x02' + hwid[7:14]) await asyncio.sleep(0.2) await client.broadcast(0x20, b'\x03' + hwid[14:16] + cmd)
[docs]async def assign_can_id_by_hwid(hwid, can_id, client): """Look for a node with given hwid and temporarily change its can_id. :raises: TimeoutError if there is no response from the node. """ cmd = struct.pack('<BH', 0x01, can_id) await make_hwid_request(client, hwid, cmd) def filter_broadcast(broadcast_evt): if isinstance(broadcast_evt, components.System.HeartbeatEvent): return broadcast_evt.broadcast.source == can_id return False await receive_event(filter_broadcast, client)
[docs]async def restart_by_hwid(hwid, client): await make_hwid_request(client, hwid, b'\x03')
[docs]async def check_node_in_isp_present(bus, wait_interval=1.0): """Return None if no node is present. If is present, returns its hwid.""" import programcan pc_node = programcan.LPC11CNode(bus) try: return await asyncio.wait_for(pc_node.read_uid(), timeout=wait_interval) except asyncio.TimeoutError: return None
[docs]async def reset_node_from_isp(bus, timeout=1.0): import programcan pc_node = programcan.LPC11CNode(bus) await asyncio.wait_for(programcan.programmer.reset(pc_node), timeout=timeout)
async def _invoke_isp_using_can_id(client, can_id): from caspia.node import Node node = Node(client, can_id) # invoke isp using can_id try: await node.system.invoke_isp() except asyncio.TimeoutError: # we might get timeout, as the node might not finished sending the response packet pass hwid = await check_node_in_isp_present(node.client) if hwid is None: raise errors.CaspiaError('Failed to invoke ISP using can id.') return hwid async def _invoke_isp_using_hwid(client, hwid): cmd = bytes([0x02]) await make_hwid_request(client, hwid, cmd) await asyncio.sleep(0.1) actual_hwid = await check_node_in_isp_present(client) if actual_hwid is None: raise errors.CaspiaError('Failed to invoke ISP using hwid.') if actual_hwid != hwid: msg = ('failed to invoke ISP using hwid. HWID does not match. ' f'(expected: {hwid.hex()}, actual: {actual_hwid.hex()})') raise errors.CaspiaError(msg) return actual_hwid
[docs]async def invoke_isp_safely(client, hwid=None, can_id=None): # precheck, that no other node is in ISP present_hwid = await check_node_in_isp_present(client) if present_hwid is not None and (present_hwid != hwid or hwid is None): msg = ('Failed to invoke ISP mode. ' f'Node with HWID {present_hwid.hex()} is already in ISP mode.') raise errors.CaspiaError(msg) if present_hwid is not None: return present_hwid # already in ISP with expected HWID try: if hwid is not None: return await _invoke_isp_using_hwid(client, hwid) elif can_id is not None: return await _invoke_isp_using_can_id(client, can_id) else: raise errors.CaspiaError('HWID not specified (nor CAN_ID).') except Exception: try: # last try to cleanup after ourselves await reset_node_from_isp(client) except Exception: pass raise
[docs]async def receive_event(accept, client, timeout=5.0): """ Wait for a specific broadcast. :param accept: If callable, must accept one argument (Event). Should return True for the expected broadcast. :param client: pollen_client to use :param timeout: raise TimeoutError after `timeout` :return: The Event instance. """ def _accept(source, group, data, **kwargs): try: broadcast = Broadcast(source, group, data, **kwargs) event = parse_broadcast(broadcast) if callable(accept) and accept(event): return event elif inspect.isclass(accept) and \ issubclass(accept, Component.Event) and isinstance(event, accept): return event else: return False except errors.CaspiaError: return False return await client.await_broadcast(_accept, timeout=timeout)
[docs]def parse_broadcast(broadcast): """ Creates specific subclass of Event from Broadcast. """ if broadcast.group & 0x100: # component broadcast comp_type = broadcast.group & 0xFF component_classes = Component.component_classes() if comp_type not in component_classes: raise errors.InvalidComponentError return component_classes[comp_type].parse_broadcast(broadcast) else: raise NotImplementedError
[docs]def hwid_read(src): hwid = src if not isinstance(hwid, str): hwid = src.read() hwid = filter(lambda c: c.isdigit() or c in 'abcdef', hwid.lower()) hwid = ''.join(hwid) if len(hwid) != 32: raise ValueError('Invalid length of HWID (should be 32 hex digits).') return bytes.fromhex(hwid)