Source code for dhcpkit.ipv6.server.extensions.static_assignments.csv

"""
An option handler that assigns addresses based on DUID from a CSV file
"""
import codecs
import csv
import logging
from ipaddress import IPv6Address, IPv6Network

from dhcpkit.ipv6.duids import DUID
from dhcpkit.ipv6.extensions.linklayer_id import LinkLayerIdOption
from dhcpkit.ipv6.extensions.remote_id import RemoteIdOption
from dhcpkit.ipv6.extensions.subscriber_id import SubscriberIdOption
from dhcpkit.ipv6.options import ClientIdOption, InterfaceIdOption
from dhcpkit.ipv6.server.extensions.static_assignments import Assignment, StaticAssignmentHandler
from dhcpkit.ipv6.server.transaction_bundle import TransactionBundle
from dhcpkit.utils import normalise_hex
from typing import List, Mapping, Tuple

logger = logging.getLogger(__name__)


[docs]class CSVStaticAssignmentHandler(StaticAssignmentHandler): """ Assign addresses and/or prefixes based on the contents of a CSV file """ def __init__(self, filename: str, address_preferred_lifetime: int, address_valid_lifetime: int, prefix_preferred_lifetime: int, prefix_valid_lifetime: int): """ Initialise the mapping. This handler will respond to clients on responsible_for_links and assume that all addresses in the mapping are appropriate for on those links. :param filename: The filename containing the CSV data """ super().__init__(address_preferred_lifetime, address_valid_lifetime, prefix_preferred_lifetime, prefix_valid_lifetime) self.filename = filename self.mapping = self.read_csv_file(filename) def __str__(self): return "{} from {}".format(self.__class__.__name__, self.filename)
[docs] def get_assignment(self, bundle: TransactionBundle) -> Assignment: """ Look up the assignment based on DUID, Interface-ID of the relay closest to the client and Remote-ID of the relay closest to the client, in that order. :param bundle: The transaction bundle :return: The assignment, if any """ # Look up based on DUID duid_option = bundle.request.get_option_of_type(ClientIdOption) duid = 'duid:' + codecs.encode(duid_option.duid.save(), 'hex').decode('ascii') if duid in self.mapping: return self.mapping[duid] # Look up based on Interface-ID interface_id_option = bundle.incoming_relay_messages[0].get_option_of_type(InterfaceIdOption) if interface_id_option: interface_id = 'interface-id:' + codecs.encode(interface_id_option.interface_id, 'hex').decode('ascii') if interface_id in self.mapping: return self.mapping[interface_id] # Look up based on Remote-ID remote_id_option = bundle.incoming_relay_messages[0].get_option_of_type(RemoteIdOption) if remote_id_option: remote_id = 'remote-id:{}:{}'.format(remote_id_option.enterprise_number, codecs.encode(remote_id_option.remote_id, 'hex').decode('ascii')) if remote_id in self.mapping: return self.mapping[remote_id] # Look up based on Subscriber-ID subscriber_id_option = bundle.incoming_relay_messages[0].get_option_of_type(SubscriberIdOption) if subscriber_id_option: subscriber_id = 'subscriber-id:{}'.format( codecs.encode(subscriber_id_option.subscriber_id, 'hex').decode('ascii') ) if subscriber_id in self.mapping: return self.mapping[subscriber_id] # Look up based on LinkLayer-ID linklayer_id_option = bundle.incoming_relay_messages[0].get_option_of_type(LinkLayerIdOption) if linklayer_id_option: linklayer_id = 'linklayer-id:{}:{}'.format( linklayer_id_option.link_layer_type, codecs.encode(linklayer_id_option.link_layer_address, 'hex').decode('ascii') ) if linklayer_id in self.mapping: return self.mapping[linklayer_id] # Nothing found return Assignment(address=None, prefix=None)
[docs] def read_csv_file(self, csv_filename: str) -> Mapping[str, Assignment]: """ Read the assignments from the file specified in the configuration :param csv_filename: The filename of the CSV file :return: A dictionary mapping identifiers to assignments """ assignments = dict(self.parse_csv_file(csv_filename)) logger.info("Loaded {} assignments from {}".format(len(assignments), csv_filename)) return assignments
[docs] @staticmethod def parse_csv_file(csv_filename: str) -> List[Tuple[str, Assignment]]: """ Read the assignments from the file specified in the configuration :param csv_filename: The filename of the CSV file :return: An list of identifiers and their assignment """ logger.debug("Loading assignments from {}".format(csv_filename)) with open(csv_filename) as csv_file: # Auto-detect the CSV dialect sniffer = csv.Sniffer() sample = csv_file.read(10240) dialect = sniffer.sniff(sample) # Restart and parse csv_file.seek(0) reader = csv.DictReader(csv_file, dialect=dialect) # First line is column headings for row in reader: try: address_str = row['address'].strip() address = address_str and IPv6Address(address_str) or None prefix_str = row['prefix'].strip() prefix = prefix_str and IPv6Network(prefix_str) or None # Validate and normalise id input row_id = row['id'] if row_id.startswith('duid:'): duid_hex = row_id.split(':', 1)[1] duid_bytes = codecs.decode(duid_hex, 'hex') length, duid = DUID.parse(duid_bytes, length=len(duid_bytes)) duid_hex = codecs.encode(duid.save(), 'hex').decode('ascii') row_id = 'duid:{}'.format(duid_hex) elif row_id.startswith('interface-id:'): interface_id_hex = row_id.split(':', 1)[1] interface_id_hex = normalise_hex(interface_id_hex) interface_id = codecs.decode(interface_id_hex, 'hex') interface_id_hex = codecs.encode(interface_id, 'hex').decode('ascii') row_id = 'interface-id:{}'.format(interface_id_hex) elif row_id.startswith('interface-id-str:'): interface_id = row_id.split(':', 1)[1] interface_id_hex = codecs.encode(interface_id.encode('ascii'), 'hex').decode('ascii') row_id = 'interface-id:{}'.format(interface_id_hex) elif row_id.startswith('remote-id:') or row_id.startswith('remote-id-str:'): remote_id_data = row_id.split(':', 1)[1] try: enterprise_id, remote_id = remote_id_data.split(':', 1) enterprise_id = int(enterprise_id) if row_id.startswith('remote-id:'): remote_id = normalise_hex(remote_id) remote_id = codecs.decode(remote_id, 'hex') else: remote_id = remote_id.encode('ascii') row_id = 'remote-id:{}:{}'.format(enterprise_id, codecs.encode(remote_id, 'hex').decode('ascii')) except ValueError: raise ValueError("Remote-ID must be formatted as 'remote-id:<enterprise>:<remote-id-hex>', " "for example: 'remote-id:9:0123456789abcdef") elif row_id.startswith('subscriber-id:'): subscriber_id_hex = row_id.split(':', 1)[1] subscriber_id_hex = normalise_hex(subscriber_id_hex) subscriber_id = codecs.decode(subscriber_id_hex, 'hex') subscriber_id_hex = codecs.encode(subscriber_id, 'hex').decode('ascii') row_id = 'subscriber-id:{}'.format(subscriber_id_hex) elif row_id.startswith('subscriber-id-str:'): subscriber_id = row_id.split(':', 1)[1] subscriber_id_hex = codecs.encode(subscriber_id.encode('ascii'), 'hex').decode('ascii') row_id = 'subscriber-id:{}'.format(subscriber_id_hex) elif row_id.startswith('linklayer-id:') or row_id.startswith('linklayer-id-str:'): linklayer_id_data = row_id.split(':', 1)[1] try: linklayer_type, linklayer_id = linklayer_id_data.split(':', 1) linklayer_type = int(linklayer_type) if row_id.startswith('linklayer-id:'): linklayer_id = normalise_hex(linklayer_id) linklayer_id = codecs.decode(linklayer_id, 'hex') else: linklayer_id = linklayer_id.encode('ascii') row_id = 'linklayer-id:{}:{}'.format(linklayer_type, codecs.encode(linklayer_id, 'hex').decode('ascii')) except ValueError: raise ValueError("LinkLayer-ID must be formatted as 'linklayer-id:<type>:<address-hex>', " "for example: 'linklayer-id:1:002436ef1d89") else: raise ValueError("Unsupported ID type, supported types: duid, interface-id, interface-id-str," "remote-id, remote-id-str, subscriber-id, subscriber-id-str, linklayer-id and" "linklayer-id-str") # Store the normalised id logger.debug("Loaded assignment for {}".format(row_id)) yield row_id, Assignment(address=address, prefix=prefix) except KeyError: raise ValueError("Assignment CSV must have columns 'id', 'address' and 'prefix'") except ValueError as e: logger.error("Ignoring {} line {} with invalid value: {}".format(csv_file, reader.line_num, e))