Source code for dhcpkit.ipv6.server.listeners.udp

UDP implementations of listeners and repliers

import logging
import socket
from ipaddress import IPv6Address

from dhcpkit.common.server.logging import DEBUG_PACKETS
from dhcpkit.ipv6 import CLIENT_PORT, SERVER_PORT
from dhcpkit.ipv6.messages import RelayReplyMessage
from dhcpkit.ipv6.options import InterfaceIdOption
from dhcpkit.ipv6.server.listeners import IncomingPacketBundle, Listener, ListeningSocketError, Replier, \
from typing import Iterable, Tuple

logger = logging.getLogger(__name__)

[docs]class UDPListener(Listener): """ A wrapper for a normal socket that bundles a socket to listen on with a (potentially different) socket to send replies from. :type interface_name: str :type interface_index: int :type listen_socket: socket.socket :type listen_address: IPv6Address :type reply_socket: socket.socket :type reply_address: IPv6Address :type global_address: IPv6Address """ def __init__(self, interface_name: str, listen_socket: socket.socket, reply_socket: socket.socket = None, global_address: IPv6Address = None, marks: Iterable[str] = None): """ Initialise listener. :param interface_name: The name of the interface :param listen_socket: The socket we are listening on, may be a unicast or multicast socket :param reply_socket: The socket replies are sent from, must be a unicast socket :param global_address: The global address on the listening interface :param marks: Marks attached to this listener """ self.interface_name = interface_name self.interface_id = interface_name.encode('utf-8') self.listen_socket = listen_socket self.reply_socket = reply_socket self.marks = list(marks or []) if self.reply_socket is None: self.reply_socket = self.listen_socket # Check that we have IPv6 UDP sockets if != socket.AF_INET6 or self.listen_socket.proto != socket.IPPROTO_UDP \ or != socket.AF_INET6 or self.reply_socket.proto != socket.IPPROTO_UDP: raise ListeningSocketError("Listen and reply sockets have to be IPv6 UDP sockets") listen_sockname = self.listen_socket.getsockname() reply_sockname = self.reply_socket.getsockname() # Check that we are on the right port if listen_sockname[1] != SERVER_PORT or reply_sockname[1] != SERVER_PORT: raise ListeningSocketError("Listen and reply sockets have to be on port {}".format(SERVER_PORT)) # Check that they are both on the same interface if listen_sockname[3] != reply_sockname[3]: raise ListeningSocketError("Listen and reply sockets have to be on same interface") self.interface_index = listen_sockname[3] self.listen_address = IPv6Address(listen_sockname[0].split('%')[0]) self.reply_address = IPv6Address(reply_sockname[0].split('%')[0]) if global_address: self.global_address = global_address elif not self.listen_address.is_link_local and not self.listen_address.is_multicast: self.global_address = self.listen_address else: raise ListeningSocketError("Cannot determine global address on interface {}".format(self.interface_name)) # We only support fixed address binding if self.listen_address.is_unspecified or self.reply_address.is_unspecified: raise ListeningSocketError("This server only supports listening on explicit address, not on wildcard") # Multicast listeners must have link-local reply addresses if self.listen_address.is_multicast and not self.reply_address.is_link_local: raise ListeningSocketError("Multicast listening addresses need link-local reply socket") # Non-multicast listeners need to use a single address if not self.listen_address.is_multicast and self.reply_socket != self.listen_socket: raise ListeningSocketError("Unicast listening addresses can't use separate reply sockets")
[docs] def recv_request(self) -> Tuple[IncomingPacketBundle, Replier]: """ Receive incoming messages :return: The incoming packet data and a replier object """ data, sender = self.listen_socket.recvfrom(65536) # Create the message-ID message_counter = increase_message_counter() message_id = '#{:06X}'.format(message_counter) logger.log(DEBUG_PACKETS, "{message_id}: Received message from {client_addr} port {port} on {interface}".format( message_id=message_id, client_addr=sender[0], port=sender[1], interface=self.interface_name)) interface_id_option = InterfaceIdOption(interface_id=self.interface_id) packet_bundle = IncomingPacketBundle(message_id=message_id, data=data, source_address=IPv6Address(sender[0].split('%')[0]), link_address=self.global_address, interface_index=self.interface_index, received_over_multicast=self.listen_address.is_multicast, received_over_tcp=False, marks=self.marks, relay_options=[interface_id_option]) replier = UDPReplier(self.reply_socket) return packet_bundle, replier
[docs] def fileno(self) -> int: """ The fileno of the listening socket, so this object can be used by select() :return: The file descriptor """ return self.listen_socket.fileno()
[docs]class UDPReplier(Replier): """ A class to send replies to the client """ def __init__(self, reply_socket: socket.socket): self.reply_socket = reply_socket
[docs] def send_reply(self, outgoing_message: RelayReplyMessage) -> bool: """ Send a reply to the client :param outgoing_message: The message to send, including a wrapping RelayReplyMessage :return: Whether sending was successful """ # Determine network addresses and bytes reply = outgoing_message.relayed_message port = isinstance(reply, RelayReplyMessage) and SERVER_PORT or CLIENT_PORT destination_address = str(outgoing_message.peer_address) data = # Try to determine the interface index from the outgoing relay options interface_index = 0 interface_name = 'unknown' interface_id_option = outgoing_message.get_option_of_type(InterfaceIdOption) if interface_id_option: try: interface_name = interface_id_option.interface_id.decode(encoding='utf-8', errors='replace') interface_index = socket.if_nametoindex(interface_id_option.interface_id) except OSError: pass destination = (destination_address, port, 0, interface_index) sent_length = self.reply_socket.sendto(data, destination) success = len(data) == sent_length if success: logger.log(DEBUG_PACKETS, "Sent {message_type} to {client_addr} port {port} on {interface}".format( message_type=outgoing_message.inner_message.__class__.__name__, client_addr=destination_address, port=port, interface=interface_name)) else: logger.error("Could not send {message_type} to {client_addr} port {port} on {interface}".format( message_type=outgoing_message.inner_message.__class__.__name__, client_addr=destination_address, port=port, interface=interface_name)) return success