Source code for caspia.gateway.cidmanagement

import logging
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict

import yaml

import caspia.toolbox.name
from caspia.gateway import errors

MAX_CAN_ID = 2**9 - 1

CanID = int
logger = logging.getLogger(__name__)


[docs]@dataclass class Reservation: """ Can ID reservation. When a node has a cid reservation, it means, that there is a preference of that node having the cid in future. No guarantees included. On the other side, if the reservation is leased, it is guaranted that the cid won't be assigned to some other node. """ cid: int node: str leased: bool
[docs]class CidManagement: """ Manages cid assignments on a can network. CAN ID lease (assignment) process: #. The search of available cids is narrowed to cids allowed for that node. #. If there is an existing lease for the node within the allowed cids range, that cid is assigned/leased again. #. If there is a cid reservation for that node, that cid will be leased to the node. #. If there is any available cid, that cid will be leased to the node. #. If there is cid reservation for some other node (which is not leased yet), that cid will be leased to the node. #. Otherwise CanIDReservationError will be raised. """ def __init__(self, cache_path: Path, config=None, maxcid=MAX_CAN_ID): self.cache_path = cache_path self.maxcid = maxcid self.reservations: Dict[CanID:Reservation] = dict() self.allowed_canids = OrderedDict() # node pattern re (compiled): cid range self._load_cached() if config: self.configure(config)
[docs] def configure(self, config): """Configure allowed cids using a config dictionary.""" for name, cid in config.items(): if isinstance(cid, int): self.allow_cids(name, range(cid, cid + 1)) else: start, end = [int(i, 0) for i in cid.split('-')] if start > end: cid_range = range(start, end - 1, -1) else: cid_range = range(start, end + 1) self.allow_cids(name, cid_range)
def _load_cached(self): if not self.cache_path.exists(): return with open(self.cache_path, 'r') as f: reservations = yaml.load(f) for name, cid in reservations.items(): try: self.reserve(name, cid) except errors.CanIDReservationError as e: logger.warning('could not reserve cid 0x%03X for %r: %s', cid, name, e) def _save_cache(self): with open(self.cache_path, 'w') as f: for reservation in self.reservations.values(): line = '{}: 0x{:03X}\n'.format(reservation.node, reservation.cid) f.write(line)
[docs] def save(self): self._save_cache()
[docs] def name_of(self, cid, *, reserved=False): """ Return name of a node having assigned given cid. By default searches for nodes having this cid leased. If `reserved` is `True`, searches also for nodes having the cid reserved. """ reservation = self.reservations.get(cid, None) if not reservation or (not reservation.leased and not reserved): return None return reservation.node
[docs] def cid_of(self, name, *, reserved=False): """ Return cid of a node with given name. By default searches for nodes having this cid leased. If `reserved` is `True`, searches also for nodes having the cid reserved. """ for cid, reservation in self.reservations.items(): if reservation.node == name and (reservation.leased or reserved): return cid return None
[docs] def is_reserved(self, cid): return cid in self.reservations
[docs] def is_leased(self, cid): return cid in self.reservations and self.reservations[cid].leased
[docs] def available_cids_for(self, name): # first list already reserver/leased cid cid = self.cid_of(name, reserved=True) if cid is not None: for pattern_re, cid_range in self.allowed_canids.items(): if pattern_re.fullmatch(name) and cid in cid_range: yield cid # iterate through allowed cids that are not leased nor reserved for pattern_re, cid_range in self.allowed_canids.items(): if pattern_re.fullmatch(name): yield from (cid for cid in cid_range if not self.is_leased(cid) and not self.is_reserved(cid)) # iterate through allowed cids that are not leased only for pattern_re, cid_range in self.allowed_canids.items(): if pattern_re.fullmatch(name): yield from (cid for cid in cid_range if not self.is_leased(cid))
[docs] def first_available_cid_for(self, name): try: return next(self.available_cids_for(name)) except StopIteration: return None
[docs] def allow_cids(self, pattern, cid_range): pattern_re = caspia.toolbox.name.create_pattern_re(pattern) self.allowed_canids[pattern_re] = cid_range
[docs] def lease(self, name, cid=None): if cid is not None and (cid < 0 or cid > MAX_CAN_ID): raise errors.CanIDReservationError('invalid cid %s' % cid) if self.is_leased(cid) and self.reservations[cid].node != name: raise errors.CanIDReservationError('cid 0x%03X is already leased' % cid) if cid is None: cid = self.first_available_cid_for(name) if cid is None: raise errors.CanIDReservationError('no available cid for node %s' % name) for pattern_re, cid_range in self.allowed_canids.items(): if pattern_re.fullmatch(name) and cid in cid_range: break else: raise errors.CanIDReservationError('cid 0x%03X is not allowed' % cid) reservation = self.reservations.get(cid, None) or Reservation( node=name, cid=cid, leased=True) reservation.node = name reservation.leased = True self.reservations[cid] = reservation return cid
[docs] def reserve(self, name, cid): if cid < 0 or cid > MAX_CAN_ID: raise errors.CanIDReservationError('invalid CID %s' % cid) if cid in self.reservations and self.reservations[cid].node == name: return if self.is_leased(cid): msg = 'could not reserve cid 0x%03X for %r, because it\'s already leased' raise errors.CanIDReservationError(msg % (cid, name)) self.reservations[cid] = Reservation(node=name, cid=cid, leased=False)