import logging
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict
import yaml
import caspia.toolbox.name
from caspia.gateway import errors
MAX_CAN_ID = 2**9 - 1
CanID = int
logger = logging.getLogger(__name__)
[docs]@dataclass
class Reservation:
"""
Can ID reservation.
When a node has a cid reservation, it means, that there is a preference
of that node having the cid in future. No guarantees included.
On the other side, if the reservation is leased, it is guaranted that the
cid won't be assigned to some other node.
"""
cid: int
node: str
leased: bool
[docs]class CidManagement:
"""
Manages cid assignments on a can network.
CAN ID lease (assignment) process:
#. The search of available cids is narrowed to cids allowed
for that node.
#. If there is an existing lease for the node within the allowed cids
range, that cid is assigned/leased again.
#. If there is a cid reservation for that node, that cid will be
leased to the node.
#. If there is any available cid, that cid will be leased to the node.
#. If there is cid reservation for some other node (which is not leased yet),
that cid will be leased to the node.
#. Otherwise CanIDReservationError will be raised.
"""
def __init__(self, cache_path: Path, config=None, maxcid=MAX_CAN_ID):
self.cache_path = cache_path
self.maxcid = maxcid
self.reservations: Dict[CanID:Reservation] = dict()
self.allowed_canids = OrderedDict() # node pattern re (compiled): cid range
self._load_cached()
if config:
self.configure(config)
def _load_cached(self):
if not self.cache_path.exists():
return
with open(self.cache_path, 'r') as f:
reservations = yaml.load(f)
for name, cid in reservations.items():
try:
self.reserve(name, cid)
except errors.CanIDReservationError as e:
logger.warning('could not reserve cid 0x%03X for %r: %s', cid, name, e)
def _save_cache(self):
with open(self.cache_path, 'w') as f:
for reservation in self.reservations.values():
line = '{}: 0x{:03X}\n'.format(reservation.node, reservation.cid)
f.write(line)
[docs] def save(self):
self._save_cache()
[docs] def name_of(self, cid, *, reserved=False):
"""
Return name of a node having assigned given cid.
By default searches for nodes having this cid leased.
If `reserved` is `True`, searches also for nodes having the cid reserved.
"""
reservation = self.reservations.get(cid, None)
if not reservation or (not reservation.leased and not reserved):
return None
return reservation.node
[docs] def cid_of(self, name, *, reserved=False):
"""
Return cid of a node with given name.
By default searches for nodes having this cid leased.
If `reserved` is `True`, searches also for nodes having the cid reserved.
"""
for cid, reservation in self.reservations.items():
if reservation.node == name and (reservation.leased or reserved):
return cid
return None
[docs] def is_reserved(self, cid):
return cid in self.reservations
[docs] def is_leased(self, cid):
return cid in self.reservations and self.reservations[cid].leased
[docs] def available_cids_for(self, name):
# first list already reserver/leased cid
cid = self.cid_of(name, reserved=True)
if cid is not None:
for pattern_re, cid_range in self.allowed_canids.items():
if pattern_re.fullmatch(name) and cid in cid_range:
yield cid
# iterate through allowed cids that are not leased nor reserved
for pattern_re, cid_range in self.allowed_canids.items():
if pattern_re.fullmatch(name):
yield from (cid for cid in cid_range
if not self.is_leased(cid) and not self.is_reserved(cid))
# iterate through allowed cids that are not leased only
for pattern_re, cid_range in self.allowed_canids.items():
if pattern_re.fullmatch(name):
yield from (cid for cid in cid_range if not self.is_leased(cid))
[docs] def first_available_cid_for(self, name):
try:
return next(self.available_cids_for(name))
except StopIteration:
return None
[docs] def allow_cids(self, pattern, cid_range):
pattern_re = caspia.toolbox.name.create_pattern_re(pattern)
self.allowed_canids[pattern_re] = cid_range
[docs] def lease(self, name, cid=None):
if cid is not None and (cid < 0 or cid > MAX_CAN_ID):
raise errors.CanIDReservationError('invalid cid %s' % cid)
if self.is_leased(cid) and self.reservations[cid].node != name:
raise errors.CanIDReservationError('cid 0x%03X is already leased' % cid)
if cid is None:
cid = self.first_available_cid_for(name)
if cid is None:
raise errors.CanIDReservationError('no available cid for node %s' % name)
for pattern_re, cid_range in self.allowed_canids.items():
if pattern_re.fullmatch(name) and cid in cid_range:
break
else:
raise errors.CanIDReservationError('cid 0x%03X is not allowed' % cid)
reservation = self.reservations.get(cid, None) or Reservation(
node=name, cid=cid, leased=True)
reservation.node = name
reservation.leased = True
self.reservations[cid] = reservation
return cid
[docs] def reserve(self, name, cid):
if cid < 0 or cid > MAX_CAN_ID:
raise errors.CanIDReservationError('invalid CID %s' % cid)
if cid in self.reservations and self.reservations[cid].node == name:
return
if self.is_leased(cid):
msg = 'could not reserve cid 0x%03X for %r, because it\'s already leased'
raise errors.CanIDReservationError(msg % (cid, name))
self.reservations[cid] = Reservation(node=name, cid=cid, leased=False)