Source code for dhcpkit.ipv6.server.listeners.multicast_interface.config

"""
Implementation of a listener on a local multicast network interface
"""
import logging
import netifaces
import socket
from ipaddress import IPv6Address
from struct import pack
from typing import Iterable

from dhcpkit.ipv6 import All_DHCP_Relay_Agents_and_Servers
from dhcpkit.ipv6.server.listeners import Listener
from dhcpkit.ipv6.server.listeners.factories import UDPListenerFactory
from dhcpkit.ipv6.server.listeners.udp import UDPListener
from dhcpkit.ipv6.utils import is_global_unicast

logger = logging.getLogger(__name__)


[docs]class MulticastInterfaceUDPListenerFactory(UDPListenerFactory): """ Factory for the implementation of a listener on a local multicast network interface """ # noinspection PyTypeChecker name_datatype = staticmethod(str)
[docs] def validate_config_section(self): """ Validate the interface information """ try: socket.if_nametoindex(self.name) except OSError: raise ValueError("Interface {} not found".format(self.name)) interface_addresses = [IPv6Address(addr_info['addr'].split('%')[0]) for addr_info in netifaces.ifaddresses(self.name).get(netifaces.AF_INET6, [])] # Pick the first link-local address as reply-from if none is specified in the configuration if not self.section.reply_from: for address in interface_addresses: if address.is_link_local: self.section.reply_from = address break if not self.section.reply_from: raise ValueError("No link-local address found on interface {}".format(self.name)) else: # Validate what the user supplied if not self.section.reply_from.is_link_local: raise ValueError("The reply-from address must be a link-local address") if self.section.reply_from not in interface_addresses: raise ValueError("Cannot find reply-from address {} on interface {}".format(self.section.reply_from, self.name)) # Pick the first global unicast address as link-address if none is specified in the configuration if not self.section.link_address: for address in interface_addresses: if is_global_unicast(address): self.section.link_address = address break if not self.section.link_address: # Use the unspecified address is we couldn't find anything self.section.link_address = IPv6Address('::') else: # Validate what the user supplied (we don't really care if it exists, it's just extra information for the # option handlers if not is_global_unicast(self.section.link_address): raise ValueError("The link-address must be a global unicast address")
[docs] def create(self, old_listeners: Iterable[Listener] = None) -> UDPListener: """ Create a listener of this class based on the configuration in the config section. :param old_listeners: A list of existing listeners in case we can recycle them :return: A listener object """ mc_address = All_DHCP_Relay_Agents_and_Servers interface_index = socket.if_nametoindex(self.name) # Try recycling old_listeners = list(old_listeners or []) for old_listener in old_listeners: if not isinstance(old_listener, UDPListener): continue if self.match_socket(sock=old_listener.listen_socket, address=mc_address, interface=interface_index): logger.debug("Recycling existing multicast socket on {}".format(self.name)) mc_sock = old_listener.listen_socket break else: logger.debug("Listening for multicast requests on {}".format(self.name)) mc_sock = socket.socket(socket.AF_INET6, self.sock_type, self.sock_proto) mc_sock.bind((str(mc_address), self.listen_port, 0, interface_index)) mc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, pack('16sI', mc_address.packed, interface_index)) # Set the socket options mc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, self.listen_to_self and 1 or 0) for old_listener in old_listeners: if not isinstance(old_listener, UDPListener): continue if self.match_socket(sock=old_listener.listen_socket, address=self.reply_from, interface=interface_index): logger.debug(" - Recycling existing reply socket for {} on {}".format(self.reply_from, self.name)) ll_sock = old_listener.listen_socket break if self.match_socket(sock=old_listener.reply_socket, address=self.reply_from, interface=interface_index): logger.debug(" - Recycling existing reply socket for {} on {}".format(self.reply_from, self.name)) ll_sock = old_listener.reply_socket break else: logger.debug(" - Sending replies from {}".format(self.reply_from)) ll_sock = socket.socket(socket.AF_INET6, self.sock_type, self.sock_proto) ll_sock.bind((str(self.reply_from), self.listen_port, 0, interface_index)) return UDPListener(interface_name=self.name, listen_socket=mc_sock, reply_socket=ll_sock, global_address=self.link_address, marks=self.marks)