# pylint: disable=broad-except
import asyncio
import inspect
import logging
import struct
from caspia.node import components, errors
from caspia.node.broadcast import Broadcast
from caspia.node.components import Component
logger = logging.getLogger(__name__)
[docs]async def make_hwid_request(client, hwid, cmd):
await client.broadcast(0x20, b'\x01' + hwid[0:7])
await asyncio.sleep(0.2)
await client.broadcast(0x20, b'\x02' + hwid[7:14])
await asyncio.sleep(0.2)
await client.broadcast(0x20, b'\x03' + hwid[14:16] + cmd)
[docs]async def assign_can_id_by_hwid(hwid, can_id, client):
"""Look for a node with given hwid and temporarily change its can_id.
:raises: TimeoutError if there is no response from the node.
"""
cmd = struct.pack('<BH', 0x01, can_id)
await make_hwid_request(client, hwid, cmd)
def filter_broadcast(broadcast_evt):
if isinstance(broadcast_evt, components.System.HeartbeatEvent):
return broadcast_evt.broadcast.source == can_id
return False
await receive_event(filter_broadcast, client)
[docs]async def restart_by_hwid(hwid, client):
await make_hwid_request(client, hwid, b'\x03')
[docs]async def check_node_in_isp_present(bus, wait_interval=1.0):
"""Return None if no node is present. If is present, returns its hwid."""
import programcan
pc_node = programcan.LPC11CNode(bus)
try:
return await asyncio.wait_for(pc_node.read_uid(), timeout=wait_interval)
except asyncio.TimeoutError:
return None
[docs]async def reset_node_from_isp(bus, timeout=1.0):
import programcan
pc_node = programcan.LPC11CNode(bus)
await asyncio.wait_for(programcan.programmer.reset(pc_node), timeout=timeout)
async def _invoke_isp_using_can_id(client, can_id):
from caspia.node import Node
node = Node(client, can_id)
# invoke isp using can_id
try:
await node.system.invoke_isp()
except asyncio.TimeoutError:
# we might get timeout, as the node might not finished sending the response packet
pass
hwid = await check_node_in_isp_present(node.client)
if hwid is None:
raise errors.CaspiaError('Failed to invoke ISP using can id.')
return hwid
async def _invoke_isp_using_hwid(client, hwid):
cmd = bytes([0x02])
await make_hwid_request(client, hwid, cmd)
await asyncio.sleep(0.1)
actual_hwid = await check_node_in_isp_present(client)
if actual_hwid is None:
raise errors.CaspiaError('Failed to invoke ISP using hwid.')
if actual_hwid != hwid:
msg = ('failed to invoke ISP using hwid. HWID does not match. '
f'(expected: {hwid.hex()}, actual: {actual_hwid.hex()})')
raise errors.CaspiaError(msg)
return actual_hwid
[docs]async def invoke_isp_safely(client, hwid=None, can_id=None):
# precheck, that no other node is in ISP
present_hwid = await check_node_in_isp_present(client)
if present_hwid is not None and (present_hwid != hwid or hwid is None):
msg = ('Failed to invoke ISP mode. '
f'Node with HWID {present_hwid.hex()} is already in ISP mode.')
raise errors.CaspiaError(msg)
if present_hwid is not None:
return present_hwid # already in ISP with expected HWID
try:
if hwid is not None:
return await _invoke_isp_using_hwid(client, hwid)
elif can_id is not None:
return await _invoke_isp_using_can_id(client, can_id)
else:
raise errors.CaspiaError('HWID not specified (nor CAN_ID).')
except Exception:
try: # last try to cleanup after ourselves
await reset_node_from_isp(client)
except Exception:
pass
raise
[docs]async def receive_event(accept, client, timeout=5.0):
""" Wait for a specific broadcast.
:param accept: If callable, must accept one argument (Event). Should
return True for the expected broadcast.
:param client: pollen_client to use
:param timeout: raise TimeoutError after `timeout`
:return: The Event instance.
"""
def _accept(source, group, data, **kwargs):
try:
broadcast = Broadcast(source, group, data, **kwargs)
event = parse_broadcast(broadcast)
if callable(accept) and accept(event):
return event
elif inspect.isclass(accept) and \
issubclass(accept, Component.Event) and isinstance(event, accept):
return event
else:
return False
except errors.CaspiaError:
return False
return await client.await_broadcast(_accept, timeout=timeout)
[docs]def parse_broadcast(broadcast):
""" Creates specific subclass of Event from Broadcast. """
if broadcast.group & 0x100: # component broadcast
comp_type = broadcast.group & 0xFF
component_classes = Component.component_classes()
if comp_type not in component_classes:
raise errors.InvalidComponentError
return component_classes[comp_type].parse_broadcast(broadcast)
else:
raise NotImplementedError
[docs]def hwid_read(src):
hwid = src
if not isinstance(hwid, str):
hwid = src.read()
hwid = filter(lambda c: c.isdigit() or c in 'abcdef', hwid.lower())
hwid = ''.join(hwid)
if len(hwid) != 32:
raise ValueError('Invalid length of HWID (should be 32 hex digits).')
return bytes.fromhex(hwid)