Source code for dhcpkit.ipv6.server.message_handler

"""
The code to handle a message
"""
import logging
import multiprocessing
from typing import Iterable, List, Optional

from dhcpkit.common.server.logging import DEBUG_HANDLING
from dhcpkit.ipv6.duids import DUID
from dhcpkit.ipv6.extensions.leasequery import LeasequeryMessage, LeasequeryReplyMessage, STATUS_MALFORMED_QUERY, \
    STATUS_NOT_ALLOWED, STATUS_UNKNOWN_QUERY_TYPE
from dhcpkit.ipv6.extensions.prefix_delegation import IAPDOption, IAPrefixOption
from dhcpkit.ipv6.messages import AdvertiseMessage, ConfirmMessage, DeclineMessage, InformationRequestMessage, \
    RebindMessage, ReleaseMessage, RenewMessage, ReplyMessage, RequestMessage, SolicitMessage
from dhcpkit.ipv6.options import ClientIdOption, IAAddressOption, IANAOption, IATAOption, STATUS_USE_MULTICAST, \
    ServerIdOption, StatusCodeOption
from dhcpkit.ipv6.server.extension_registry import server_extension_registry
from dhcpkit.ipv6.server.filters import Filter
from dhcpkit.ipv6.server.handlers import CannotRespondError, Handler, ReplyWithLeasequeryError, ReplyWithStatusError, \
    UseMulticastError
from dhcpkit.ipv6.server.handlers.client_id import ClientIdHandler
from dhcpkit.ipv6.server.handlers.interface_id import InterfaceIdOptionHandler
from dhcpkit.ipv6.server.handlers.rapid_commit import RapidCommitHandler
from dhcpkit.ipv6.server.handlers.server_id import ForOtherServerError, ServerIdHandler
from dhcpkit.ipv6.server.handlers.status_option import AddMissingStatusOptionHandler
from dhcpkit.ipv6.server.handlers.unanswered_ia import UnansweredIAOptionHandler
from dhcpkit.ipv6.server.handlers.unicast import RejectUnwantedUnicastHandler
from dhcpkit.ipv6.server.statistics import StatisticsSet
from dhcpkit.ipv6.server.transaction_bundle import TransactionBundle

logger = logging.getLogger(__name__)


[docs]class MessageHandler: """ Message processing class """ def __init__(self, server_id: DUID, sub_filters: Iterable[Filter] = None, sub_handlers: Iterable[Handler] = None, allow_rapid_commit: bool = False, rapid_commit_rejections: bool = False): self.server_id = server_id self.sub_filters = list(sub_filters or []) self.sub_handlers = list(sub_handlers or []) self.allow_rapid_commit = allow_rapid_commit self.rapid_commit_rejections = rapid_commit_rejections # Prepare static stuff self.setup_handlers = self.get_setup_handlers() self.cleanup_handlers = self.get_cleanup_handlers()
[docs] def worker_init(self): """ Separate initialisation that will be called in each worker process that is created. Things that can't be forked (think database connections etc) have to be initialised here. """ logger.debug("Initialising MessageHandler in {}".format(multiprocessing.current_process().name)) # Cascade to sub-filters and sub-handlers for sub_filter in self.sub_filters: sub_filter.worker_init() for sub_handler in self.sub_handlers: sub_handler.worker_init()
[docs] def get_handlers(self, bundle: TransactionBundle) -> List[Handler]: """ Get all handlers that are going to be applied to the request in the bundle. :param bundle: The transaction bundle :return: The list of handlers to apply """ # Build the handlers list handlers = [] """:type: [Handler]""" # Add setup handlers handlers += self.setup_handlers # Apply all sub-filters and collect their handlers. The idea behind this is that handlers on more-specific # filters take precedence over handlers on the outer filters. for sub_filter in self.sub_filters: handlers += sub_filter.get_handlers(bundle) # Now add our own handlers handlers += self.sub_handlers # Add cleanup handlers handlers += self.cleanup_handlers return handlers
[docs] def get_setup_handlers(self) -> List[Handler]: """ Build a list of setup handlers and cache it :return: The list of handlers """ handlers = [] """:type: [Handler]""" if self.allow_rapid_commit: # Rapid commit happens as the first thing in the post() stage handlers.append(RapidCommitHandler(self.rapid_commit_rejections)) # These are mandatory handlers.append(ServerIdHandler(duid=self.server_id)) handlers.append(ClientIdHandler()) handlers.append(InterfaceIdOptionHandler()) # Add setup handlers from extensions for extension_name, extension in server_extension_registry.items(): create_setup_handlers = getattr(extension, 'create_setup_handlers', None) if create_setup_handlers: setup_handlers = create_setup_handlers() for setup_handler in setup_handlers: logger.log(DEBUG_HANDLING, "Extension {} added {} to setup phase".format( extension_name, setup_handler.__class__.__name__ )) handlers += setup_handlers return handlers
[docs] @staticmethod def get_cleanup_handlers() -> List[Handler]: """ Build a list of cleanup handlers and cache it :return: The list of handlers """ handlers = [] """:type: [Handler]""" # Reject unicast requests unless they have been explicitly permitted handlers.append(RejectUnwantedUnicastHandler()) # Add cleanup handlers so they run last in the handling phase handlers.append(UnansweredIAOptionHandler()) # Add cleanup handlers from extensions for extension_name, extension in server_extension_registry.items(): create_cleanup_handlers = getattr(extension, 'create_cleanup_handlers', None) if create_cleanup_handlers: cleanup_handlers = create_cleanup_handlers() for cleanup_handler in cleanup_handlers: logger.log(DEBUG_HANDLING, "Extension {} added {} to cleanup phase".format( extension_name, cleanup_handler.__class__.__name__ )) handlers += cleanup_handlers # Confirm/Release/Decline messages always need a status handlers.append(AddMissingStatusOptionHandler()) return handlers
[docs] @staticmethod def init_response(bundle: TransactionBundle): """ Create the message object in bundle.response :param bundle: The transaction bundle """ # Start building the response if isinstance(bundle.request, SolicitMessage): bundle.response = AdvertiseMessage(bundle.request.transaction_id) elif isinstance(bundle.request, (RequestMessage, RenewMessage, RebindMessage, ReleaseMessage, DeclineMessage, InformationRequestMessage)): bundle.response = ReplyMessage(bundle.request.transaction_id) elif isinstance(bundle.request, ConfirmMessage): # Receipt of Confirm Messages: If [...] there were no addresses in any of the IAs sent by the client, the # server MUST NOT send a reply to the client. for option in bundle.request.get_options_of_type(IANAOption, IATAOption, IAPDOption): if option.get_options_of_type((IAAddressOption, IAPrefixOption)): # Found an address or prefix option break else: # Not found: ignore request raise CannotRespondError("No IAs present in confirm reply") bundle.response = ReplyMessage(bundle.request.transaction_id) elif isinstance(bundle.request, LeasequeryMessage): # The Leasequery protocol has its own reply type bundle.response = LeasequeryReplyMessage(bundle.request.transaction_id) else: raise CannotRespondError("Do not know how to reply to {}".format(type(bundle.request).__name__)) # Build the plain chain of relay reply messages bundle.create_outgoing_relay_messages()
[docs] def construct_plain_status_reply(self, bundle: TransactionBundle, option: StatusCodeOption) -> ReplyMessage: """ Construct a reply message signalling a status code to the client. :param bundle: The transaction bundle containing the incoming request :param option: The status code option to include in the reply :return: A reply with only the bare necessities and a status code """ return ReplyMessage(bundle.request.transaction_id, options=[ bundle.request.get_option_of_type(ClientIdOption), ServerIdOption(duid=self.server_id), option ])
[docs] def construct_leasequery_status_reply(self, bundle: TransactionBundle, option: StatusCodeOption) -> LeasequeryReplyMessage: """ Construct a leasequery reply message signalling a status code to the client. :param bundle: The transaction bundle containing the incoming request :param option: The status code option to include in the reply :return: A leasequery reply with only the bare necessities and a status code """ return LeasequeryReplyMessage(bundle.request.transaction_id, options=[ bundle.request.get_option_of_type(ClientIdOption), ServerIdOption(duid=self.server_id), option ])
[docs] def construct_use_multicast_reply(self, bundle: TransactionBundle) -> Optional[ReplyMessage]: """ Construct a message signalling to the client that they should have used multicast. :param bundle: The transaction bundle containing the incoming request :return: The proper answer to tell a client to use multicast """ # Make sure we only tell this to requests that came in over unicast if bundle.received_over_multicast: logger.error("Not telling client to use multicast, they already did...") return None return self.construct_plain_status_reply( bundle, StatusCodeOption(STATUS_USE_MULTICAST, "You cannot send requests directly to this server, " "please use the proper multicast addresses") )
[docs] def handle(self, bundle: TransactionBundle, statistics: StatisticsSet): """ The main dispatcher for incoming messages. :param bundle: The transaction bundle :param statistics: Container for shared memory with statistics counters """ if not bundle.request: # Nothing to do... return # Update the allow_rapid_commit flag bundle.allow_rapid_commit = self.allow_rapid_commit # Count the incoming message type statistics.count_message_in(bundle.request.message_type) # Log what we are doing (low-detail, so not DEBUG_HANDLING here) logger.debug("Handling {}".format(bundle)) # Collect the handlers handlers = self.get_handlers(bundle) # Analyse pre for handler in handlers: # noinspection PyBroadException try: handler.analyse_pre(bundle) except: # Ignore all errors, analysis isn't that important logger.exception("{} pre analysis failed".format(handler.__class__.__name__)) try: # Pre-process the request for handler in handlers: handler.pre(bundle) # Init the response self.init_response(bundle) # Process the request for handler in handlers: logger.log(DEBUG_HANDLING, "Applying {}".format(handler)) handler.handle(bundle) # Post-process the request for handler in handlers: handler.post(bundle) except ForOtherServerError as e: # Specific form of CannotRespondError that should have its own log message message = str(e) or 'Message is for another server' logger.debug("{}: ignoring".format(message)) statistics.count_for_other_server() bundle.response = None except CannotRespondError as e: message = str(e) or 'Cannot respond to this message' logger.warning("{}: ignoring".format(message)) statistics.count_do_not_respond() bundle.response = None except UseMulticastError: logger.debug("Unicast request received when multicast is required: informing client") statistics.count_use_multicast() bundle.response = self.construct_use_multicast_reply(bundle) except ReplyWithStatusError as e: # Leasequery has its own reply message type if isinstance(e, ReplyWithLeasequeryError): bundle.response = self.construct_leasequery_status_reply(bundle, e.option) else: bundle.response = self.construct_plain_status_reply(bundle, e.option) logger.warning("Replying with {}".format(e)) # Update the right counter based on the status code if e.option.status_code == STATUS_UNKNOWN_QUERY_TYPE: statistics.count_unknown_query_type() elif e.option.status_code == STATUS_MALFORMED_QUERY: statistics.count_malformed_query() elif e.option.status_code == STATUS_NOT_ALLOWED: statistics.count_not_allowed() else: statistics.count_other_error() # Analyse post for handler in handlers: # noinspection PyBroadException try: handler.analyse_post(bundle) except: # Ignore all errors, analysis isn't that important logger.exception("{} post analysis failed".format(handler.__class__.__name__)) if bundle.response: logger.log(DEBUG_HANDLING, "Responding with {}".format(bundle.response.__class__.__name__)) # Count the outgoing message type statistics.count_message_out(bundle.response.message_type) else: logger.log(DEBUG_HANDLING, "Not responding")