Source code for dhcpkit.ipv6.server.extensions.leasequery

"""
Implementation of the Leasequery and Bulk Leasequery extensions.
"""
import codecs
import logging
from ipaddress import IPv6Address, IPv6Network
from typing import Iterable, Iterator, List, Optional, Tuple, Union

from dhcpkit.ipv6.duids import DUID
from dhcpkit.ipv6.extensions.bulk_leasequery import LeasequeryDataMessage, LeasequeryDoneMessage, RelayIdOption, \
    STATUS_QUERY_TERMINATED
from dhcpkit.ipv6.extensions.leasequery import ClientDataOption, LQClientLink, LQQueryOption, LQRelayDataOption, \
    LeasequeryMessage, STATUS_NOT_ALLOWED, STATUS_UNKNOWN_QUERY_TYPE
from dhcpkit.ipv6.extensions.prefix_delegation import IAPDOption, IAPrefixOption, OPTION_IAPREFIX, OPTION_IA_PD
from dhcpkit.ipv6.extensions.remote_id import RemoteIdOption
from dhcpkit.ipv6.messages import Message, RelayForwardMessage, ReplyMessage
from dhcpkit.ipv6.options import IAAddressOption, IANAOption, IATAOption, OPTION_CLIENTID, OPTION_IAADDR, \
    OPTION_IA_NA, OPTION_IA_TA, OPTION_ORO, OPTION_RELAY_MSG, OPTION_SERVERID, OPTION_STATUS_CODE, Option, \
    STATUS_SUCCESS, STATUS_UNSPEC_FAIL, StatusCodeOption
from dhcpkit.ipv6.server.handlers import Handler, ReplyWithLeasequeryError
from dhcpkit.ipv6.server.transaction_bundle import MessagesList, TransactionBundle

logger = logging.getLogger(__name__)


[docs]def create_cleanup_handlers() -> List[Handler]: """ Create handlers to handle unhandled queries :return: Handlers to add to the handler chain """ return [ UnansweredLeasequeryHandler(), ]
[docs]class UnansweredLeasequeryHandler(Handler): """ When there are leasequeries that haven't been handled at the end of the handling phase that means that no handler understood the query. """
[docs] def post(self, bundle: TransactionBundle): """ Check for unhandled leasequeries. :param bundle: The transaction bundle """ if not isinstance(bundle.request, LeasequeryMessage): # Only leasequeries are relevant return unhandled_queries = bundle.get_unhandled_options(LQQueryOption) if unhandled_queries: query = unhandled_queries[0] raise ReplyWithLeasequeryError(STATUS_UNKNOWN_QUERY_TYPE, "This server can't handle query type {}".format(query.query_type))
[docs]class LeasequeryStore: """ Base class for leasequery stores """ def __init__(self): """ The main initialisation will be done in the master process. After initialisation the master process will create worker processes using the multiprocessing module. Things that can't be pickled and transmitted to the worker processes (think database connections etc) have to be initialised separately. Each worker process will call worker_init() to do so. Filters that don't need per-worker initialisation can do everything here in __init__(). """ self.sensitive_options = []
[docs] def worker_init(self, sensitive_options: Iterable[int]): """ Separate initialisation that will be called in each worker process that is created. Things that can't be forked (think database connections etc) have to be initialised here. :param sensitive_options: The options that are not allowed to be stored """ self.sensitive_options = list(sensitive_options or [])
def __str__(self): """ Return a representation of this store for logging purposes :return: A descriptive string """ # Use the class name as default, let subclasses overrule this where it makes sense return self.__class__.__name__
[docs] def remember_lease(self, bundle: TransactionBundle): """ Remember the leases in the given transaction bundle so they can be queried later. :param bundle: The transaction to remember """ raise NotImplementedError
[docs] def find_leases(self, query: LQQueryOption) -> Tuple[int, Iterable[Tuple[IPv6Address, ClientDataOption]]]: """ Find all leases that match the given query. :param query: The query :return: The number of leases and an iterator over tuples of link-address and corresponding client data """ raise NotImplementedError
[docs] @staticmethod def filter_options(options: Iterable[Option], unwanted_option_types: Iterable[int]) -> Iterable[Option]: """ Remove unwanted data from the options. :param options: The options to filter :param unwanted_option_types: List of option types to filter out :return: The filtered options """ # Scrub the output so that we don't return options that are marked as sensitive or otherwise unwanted return [option for option in options if option.option_type not in unwanted_option_types]
[docs] def filter_sensitive_options(self, options: Iterable[Option]) -> Iterable[Option]: """ Remove sensitive data from the options. :param options: The options to filter :return: The filtered options """ return self.filter_options(options, self.sensitive_options)
[docs] def filter_storable_options(self, options: Iterable[Option]) -> Iterable[Option]: """ Only include storable data from the options. :param options: The options to filter :return: The filtered options """ return self.filter_options( options, self.sensitive_options + [ OPTION_CLIENTID, OPTION_SERVERID, OPTION_RELAY_MSG, OPTION_ORO, OPTION_IA_NA, OPTION_IA_TA, OPTION_IA_PD, OPTION_IAADDR, OPTION_IAPREFIX, OPTION_STATUS_CODE ] )
[docs] def filter_requested_options(self, options: Iterable[Option], requested_options: Iterable[int]): """ Only return options that are requested by the leasequery client. :param options: The original list of options :param requested_options: The list of requested options :return: The filtered list """ if not requested_options: # No options requested, no options returned return [] return [option for option in options if option.option_type in requested_options and option.option_type not in self.sensitive_options]
[docs] @staticmethod def is_accepted(element: Union[ReplyMessage, IANAOption, IATAOption, IAPDOption]) -> bool: """ Check if there is no status code that signals rejection. :param element: The element to look in :return: Whether the status is ok """ status = element.get_option_of_type(StatusCodeOption) if not status: return True return status.status_code == STATUS_SUCCESS
[docs] @staticmethod def encode_duid(duid: DUID) -> str: """ Encode DUID as a string. :param duid: The DUID object :return: The string representing the DUID """ return codecs.encode(duid.save(), 'hex').decode('ascii')
[docs] @staticmethod def decode_duid(duid_str: str) -> DUID: """ Decode DUID from a string. :param duid_str: The DUID string :return: The DUID object """ duid_bytes = bytes.fromhex(duid_str) duid_len, duid = DUID.parse(duid_bytes, length=len(duid_bytes)) return duid
[docs] @staticmethod def encode_remote_id(remote_id_option: RemoteIdOption) -> str: """ Encode remote id as a string. :param remote_id_option: The remote-id option :return: The string representing the remote-id """ return "{}:{}".format(remote_id_option.enterprise_number, codecs.encode(remote_id_option.remote_id, 'hex').decode('ascii'))
[docs] @staticmethod def decode_remote_id(remote_id_str: str) -> RemoteIdOption: """ Decode remote id from a string. :param remote_id_str: The remote-id string :return: The remote-id option """ parts = remote_id_str.split(':', maxsplit=1) enterprise_number = int(parts[0]) remote_id = bytes.fromhex(parts[1]) return RemoteIdOption(enterprise_number, remote_id)
[docs] def get_remote_ids(self, bundle: TransactionBundle) -> Iterator[str]: """ Go through all the relay messages and return all remote-ids found as lowercase hex strings :param bundle: The transaction bundle :return: The remote-ids as hex strings """ for relay_message in bundle.incoming_relay_messages: for remote_id_option in relay_message.get_options_of_type(RemoteIdOption): yield self.encode_remote_id(remote_id_option)
[docs] @staticmethod def get_relay_ids(bundle: TransactionBundle) -> Iterator[str]: """ Go through all the relay messages and return all relay-ids found as lowercase hex strings :param bundle: The transaction bundle :return: The relay-ids as hex strings """ for relay_message in bundle.incoming_relay_messages: for relay_id_option in relay_message.get_options_of_type(RelayIdOption): yield codecs.encode(relay_id_option.duid, 'hex').decode('ascii')
[docs] def get_address_leases(self, bundle: TransactionBundle) -> Iterator[IAAddressOption]: """ Search through the reply and return all addresses given to the client. :param bundle: The transaction bundle :return: The address options """ for option in bundle.response.get_options_of_type(IANAOption, IATAOption): if not self.is_accepted(option): continue yield from option.get_options_of_type(IAAddressOption)
[docs] def get_prefix_leases(self, bundle: TransactionBundle) -> Iterator[IAPrefixOption]: """ Search through the reply and return all prefixes given to the client. :param bundle: The transaction bundle :return: The prefix options """ for option in bundle.response.get_options_of_type(IAPDOption): if not self.is_accepted(option): continue yield from option.get_options_of_type(IAPrefixOption)
[docs] def encode_options(self, options: Iterable[Option]) -> bytes: """ Encode a list of options as bytes. :param options: The list of options :return: The bytes """ out = b'' for option in self.filter_storable_options(options): out += option.save() return out
[docs] @staticmethod def decode_options(data: bytes) -> Iterable[Option]: """ Decode a list of options from bytes. :param data: The bytes :return: The list of options """ options = [] max_length = len(data) offset = 0 while max_length > offset: used_buffer, option = Option.parse(data, offset=offset, length=max_length - offset) options.append(option) offset += used_buffer return options
[docs] def encode_relay_messages(self, relay_chain: Optional[RelayForwardMessage]) -> bytes: """ Encode a chain of relay messages as bytes. :param relay_chain: The incoming relay messages :return: The bytes """ if not relay_chain: return b'' current_in = relay_chain current_out = None out = None while isinstance(current_in, RelayForwardMessage): new_relay_message = RelayForwardMessage(hop_count=current_in.hop_count, link_address=current_in.link_address, peer_address=current_in.peer_address, options=self.filter_storable_options(current_in.options)) if not current_out: out = new_relay_message else: current_out.relayed_message = new_relay_message current_in = current_in.relayed_message current_out = new_relay_message # Save the resulting chain if not out: return b'' return out.save()
[docs] @staticmethod def decode_relay_messages(data: bytes) -> Optional[RelayForwardMessage]: """ Decode a chain of relay messages from bytes. :param data: The bytes :return: The relay message """ if not data: return None return Message.parse(data)[1]
[docs] def build_relay_data_option_from_relay_data(self, relay_data: bytes) -> Optional[LQRelayDataOption]: """ The relay data includes the outer relay message, which is generated inside the server to keep track of where we got the request from. When returning relay data to the leasequery client we build the LQRelayDataOption using this internal relay message only including the real relay messages we received. :param relay_data: The raw relay data :return: The LQRelayDataOption if applicable """ if not relay_data: return None relay_chain = self.decode_relay_messages(relay_data) # The outer relay message contains the peer address we need peer_address = relay_chain.peer_address # Go up one position in the relay chain relay_chain = relay_chain.relayed_message if not isinstance(relay_chain, RelayForwardMessage): # We only had the internal relay message, so we didn't receive this client's request through a relay return None return LQRelayDataOption(peer_address, relay_chain)
[docs]class LeasequeryHandler(Handler): """ Handle leasequery requests and analyse replies that we send out to store any observed leases. """ def __init__(self, store: LeasequeryStore, allow_from: Iterable[IPv6Network] = None, sensitive_options: Iterable[int] = None): super().__init__() self.store = store self.allow_from = list(allow_from or []) self.sensitive_options = list(sensitive_options or [])
[docs] def worker_init(self): """ Make sure the store gets a chance to initialise itself. """ self.store.worker_init(self.sensitive_options)
[docs] def pre(self, bundle: TransactionBundle): """ Make sure we allow this client to make leasequery requests. :param bundle: The transaction bundle """ if not isinstance(bundle.request, LeasequeryMessage): # Not a leasequery, not our business return # Check access based on relay closest to the client if not any([bundle.incoming_relay_messages[0].peer_address in allow_from for allow_from in self.allow_from]): raise ReplyWithLeasequeryError(STATUS_NOT_ALLOWED, "Leasequery not allowed from your address")
[docs] @staticmethod def generate_data_messages(transaction_id: bytes, leases: Iterator[Tuple[IPv6Address, ClientDataOption]]) \ -> Iterator[Union[LeasequeryDataMessage, LeasequeryDoneMessage]]: """ Generate a leasequery data message for each of the leases, followed by a leasequery done message. :param transaction_id: The transaction ID to use in the messages :param leases: An open iterator for the data we still need to return :return: Leasequery messages to send to the client """ for link_address, data_option in leases: yield LeasequeryDataMessage(transaction_id, options=[data_option]) yield LeasequeryDoneMessage(transaction_id, options=[ StatusCodeOption(STATUS_SUCCESS, "That's all folks") ])
[docs] def handle(self, bundle: TransactionBundle): """ Perform leasequery if requested. :param bundle: The transaction bundle """ if not isinstance(bundle.request, LeasequeryMessage): # Not a leasequery, not our business return # Extract the query queries = bundle.get_unhandled_options(LQQueryOption) if not queries: # No unhandled queries return query = queries[0] # Get the leases from the store lease_count, leases = self.store.find_leases(query) # A count of -1 means unsupported query, so we stop handling if lease_count < 0: return # Otherwise mark this query as handled bundle.mark_handled(query) # What we do now depends on the protocol if bundle.received_over_tcp: try: if lease_count > 0: # We're doing bulk leasequery, return all the records in separate messages leases_iterator = iter(leases) first_link_address, first_data_option = next(leases_iterator) first_message = bundle.response first_message.options.append(first_data_option) bundle.responses = MessagesList(first_message, self.generate_data_messages(first_message.transaction_id, leases_iterator)) else: # If the server does not find any bindings satisfying a query, it # SHOULD send a LEASEQUERY-REPLY without an OPTION_STATUS_CODE option # and without any OPTION_CLIENT_DATA option. pass except: # Something went wrong (database changes while reading?), abort logger.exception("Error while building bulk leasequery response") raise ReplyWithLeasequeryError(STATUS_QUERY_TERMINATED, "Error constructing your reply, please try again") else: try: if lease_count == 1: # One entry found, return it leases_iterator = iter(leases) first_link_address, first_data_option = next(leases_iterator) bundle.response.options.append(first_data_option) elif lease_count > 1: # The Client Link option is used only in a LEASEQUERY-REPLY message and # identifies the links on which the client has one or more bindings. # It is used in reply to a query when no link-address was specified and # the client is found to be on more than one link. link_addresses = set([link_address for link_address, data_option in leases]) bundle.response.options.append(LQClientLink(link_addresses)) except: # Something went wrong (database changes while reading?), abort logger.exception("Error while building leasequery response") raise ReplyWithLeasequeryError(STATUS_UNSPEC_FAIL, "Error constructing your reply, please try again")
[docs] def analyse_post(self, bundle: TransactionBundle): """ Watch outgoing replies and store observed leases in the store. :param bundle: The transaction bundle containing the outgoing reply """ if isinstance(bundle.response, ReplyMessage): # We're only interested in replies, advertise messages don't give a lease. Rapid commit will have turned # this into a reply when used, so checking for replies is enough. self.store.remember_lease(bundle)