Source code for dhcpkit.ipv6.server.main

"""
The main server process
"""
import argparse
import atexit
import fcntl
import json
import logging.handlers
import multiprocessing
import multiprocessing.queues
import os
import pwd
import selectors
import signal
import sys
import time
from multiprocessing import forkserver
from multiprocessing.util import get_logger
from urllib.parse import urlparse

import dhcpkit
from ZConfig import ConfigurationSyntaxError, DataConversionError
from dhcpkit.common.privileges import drop_privileges, restore_privileges
from dhcpkit.common.server.logging.config_elements import set_verbosity_logger
from dhcpkit.ipv6.server import config_parser, queue_logger
from dhcpkit.ipv6.server.config_elements import MainConfig
from dhcpkit.ipv6.server.control_socket import ControlConnection, ControlSocket
from dhcpkit.ipv6.server.listeners import ClosedListener, IgnoreMessage, Listener, ListenerCreator
from dhcpkit.ipv6.server.nonblocking_pool import NonBlockingPool
from dhcpkit.ipv6.server.queue_logger import WorkerQueueHandler
from dhcpkit.ipv6.server.statistics import ServerStatistics
from dhcpkit.ipv6.server.worker import handle_message, setup_worker
from typing import Iterable, Optional

logger = logging.getLogger()

logging_thread = None


[docs]@atexit.register def stop_logging_thread(): """ Stop the logging thread from the global """ global logging_thread if logging_thread: logging_thread.stop()
[docs]def error_callback(exception): """ Show exceptions that occur while handling messages :param exception: The exception that occurred """ message = "Unexpected exception while delegating handling to worker {}".format(exception) if exception.__cause__: message += ":" + str(exception.__cause__) logger.error(message)
[docs]def handle_args(args: Iterable[str]): """ Handle the command line arguments. :param args: Command line arguments :return: The arguments object """ parser = argparse.ArgumentParser( description="A flexible IPv6 DHCP server written in Python.", ) parser.add_argument("config", help="the configuration file") parser.add_argument("-v", "--verbosity", action="count", default=0, help="increase output verbosity") parser.add_argument("-c", "--control-socket", action="store", metavar="FILENAME", help="location of domain socket for server control") parser.add_argument("-p", "--pidfile", action="store", help="save the server's PID to this file") args = parser.parse_args(args) return args
[docs]def create_pidfile(args, config: MainConfig) -> Optional[str]: """ Create a PID file when configured to do so. :param args: The command line arguments :param config: The server configuration :return: The name of the created PID file """ # Create the PID file while we are root if args.pidfile: pid_filename = os.path.realpath(args.pidfile) elif config.pid_file: pid_filename = os.path.realpath(config.pid_file) else: pid_filename = None if pid_filename: # A different umask for here old_umask = os.umask(0o022) try: os.unlink(pid_filename) except OSError: pass with open(pid_filename, 'w') as pidfile: logger.info("Writing PID-file {}".format(pid_filename)) pidfile.write("{}\n".format(os.getpid())) os.umask(old_umask) return pid_filename
[docs]def create_control_socket(args, config: MainConfig) -> ControlSocket: """ Create a control socket when configured to do so. :param args: The command line arguments :param config: The server configuration :return: The name of the created control socket """ if args.control_socket: socket_filename = os.path.realpath(args.control_socket) elif config.control_socket: socket_filename = os.path.realpath(config.control_socket) else: socket_filename = None if socket_filename: # Default to the user that started the server control_socket_user = config.control_socket_user if config.control_socket_user else pwd.getpwuid(os.getuid()) uid = control_socket_user.pw_uid gid = config.control_socket_group.gr_gid if config.control_socket_group else control_socket_user.pw_gid # A different umask for here old_umask = os.umask(0o117) control_socket = ControlSocket(socket_filename) # Change owner if necessary if uid != os.geteuid() or gid != os.getegid(): os.chown(socket_filename, uid, gid) os.umask(old_umask) return control_socket
[docs]def main(args: Iterable[str]) -> int: """ The main program loop :param args: Command line arguments :return: The program exit code """ # Handle command line arguments args = handle_args(args) set_verbosity_logger(logger, args.verbosity) # Go to the working directory config_file = os.path.realpath(args.config) os.chdir(os.path.dirname(config_file)) try: # Read the configuration config = config_parser.load_config(config_file) except (ConfigurationSyntaxError, DataConversionError) as e: # Make the config exceptions a bit more readable msg = e.message if e.lineno and e.lineno != -1: msg += ' on line {}'.format(e.lineno) if e.url: parts = urlparse(e.url) msg += ' in {}'.format(parts.path) logger.critical(msg) return 1 except ValueError as e: logger.critical(e) return 1 # Immediately drop privileges in a non-permanent way so we create logs with the correct owner drop_privileges(config.user, config.group, permanent=False) # Trigger the forkserver at this point, with dropped privileges, and ignoring KeyboardInterrupt signal.signal(signal.SIGINT, signal.SIG_IGN) multiprocessing.set_start_method('forkserver') forkserver.ensure_running() # Initialise the logger config.logging.configure(logger, verbosity=args.verbosity) logger.info("Starting Python DHCPv6 server v{}".format(dhcpkit.__version__)) # Create our selector sel = selectors.DefaultSelector() # Convert signals to messages on a pipe signal_r, signal_w = os.pipe() flags = fcntl.fcntl(signal_w, fcntl.F_GETFL, 0) flags = flags | os.O_NONBLOCK fcntl.fcntl(signal_w, fcntl.F_SETFL, flags) signal.set_wakeup_fd(signal_w) sel.register(signal_r, selectors.EVENT_READ) # Ignore normal signal handling by attaching dummy handlers (SIG_IGN will not put messages on the pipe) signal.signal(signal.SIGINT, lambda signum, frame: None) signal.signal(signal.SIGTERM, lambda signum, frame: None) signal.signal(signal.SIGHUP, lambda signum, frame: None) signal.signal(signal.SIGUSR1, lambda signum, frame: None) # Excessive exception catcher exception_history = [] # Some stats message_count = 0 # Create a queue for our children to log to logging_queue = multiprocessing.Queue() statistics = ServerStatistics() listeners = [] control_socket = None stopping = False while not stopping: # Safety first: assume we want to quit when we break the inner loop unless told otherwise stopping = True # Initialise the logger again lowest_log_level = config.logging.configure(logger, verbosity=args.verbosity) # Enable multiprocessing logging, mostly useful for development mp_logger = get_logger() mp_logger.propagate = config.logging.log_multiprocessing global logging_thread if logging_thread: logging_thread.stop() logging_thread = queue_logger.QueueLevelListener(logging_queue, *logger.handlers) logging_thread.start() # Use the logging queue in the main process as well so messages don't get out of order logging_handler = WorkerQueueHandler(logging_queue) logging_handler.setLevel(lowest_log_level) logger.handlers = [logging_handler] # Restore our privileges while we write the PID file and open network listeners restore_privileges() # Open the network listeners old_listeners = listeners listeners = [] for listener_factory in config.listener_factories: # Create new listener while trying to re-use existing sockets listeners.append(listener_factory(old_listeners + listeners)) # Forget old listeners del old_listeners # Write the PID file pid_filename = create_pidfile(args=args, config=config) # Create a control socket if control_socket: sel.unregister(control_socket) control_socket.close() control_socket = create_control_socket(args=args, config=config) if control_socket: sel.register(control_socket, selectors.EVENT_READ) # And Drop privileges again drop_privileges(config.user, config.group, permanent=False) # Remove any file descriptors from the previous config for fd, key in list(sel.get_map().items()): # Don't remove our signal handling pipe, control socket, still existing listeners and control connections if key.fileobj is signal_r \ or (control_socket and key.fileobj is control_socket) \ or key.fileobj in listeners \ or isinstance(key.fileobj, ControlConnection): continue # Seems we don't need this one anymore sel.unregister(key.fileobj) # Collect all the file descriptors we want to listen to existing_listeners = [key.fileobj for key in sel.get_map().values()] for listener in listeners: if listener not in existing_listeners: sel.register(listener, selectors.EVENT_READ) # Configuration tree try: message_handler = config.create_message_handler() except Exception as e: if args.verbosity >= 3: logger.exception("Error initialising DHCPv6 server") else: logger.critical("Error initialising DHCPv6 server: {}".format(e)) return 1 # Make sure we have space to store all the interface statistics statistics.set_categories(config.statistics) # Start worker processes my_pid = os.getpid() with NonBlockingPool(processes=config.workers, initializer=setup_worker, initargs=(message_handler, logging_queue, lowest_log_level, statistics, my_pid)) as pool: logger.info("Python DHCPv6 server is ready to handle requests") running = True while running: count_exception = False # noinspection PyBroadException try: events = sel.select() for key, mask in events: if isinstance(key.fileobj, Listener): try: packet, replier = key.fileobj.recv_request() # Update stats message_count += 1 # Dispatch pool.apply_async(handle_message, args=(packet, replier), error_callback=error_callback) except IgnoreMessage: # Message isn't complete, leave it for now pass except ClosedListener: # This listener is closed (at least TCP shutdown for incoming data), so forget about it sel.unregister(key.fileobj) listeners.remove(key.fileobj) elif isinstance(key.fileobj, ListenerCreator): # Activity on this object means we have a new listener new_listener = key.fileobj.create_listener() if new_listener: sel.register(new_listener, selectors.EVENT_READ) listeners.append(new_listener) # Handle signal notifications elif key.fileobj == signal_r: signal_nr = os.read(signal_r, 1) if signal_nr[0] in (signal.SIGHUP,): # SIGHUP tells the server to reload try: # Read the new configuration config = config_parser.load_config(config_file) except (ConfigurationSyntaxError, DataConversionError) as e: # Make the config exceptions a bit more readable msg = "Not reloading: " + str(e.message) if e.lineno and e.lineno != -1: msg += ' on line {}'.format(e.lineno) if e.url: parts = urlparse(e.url) msg += ' in {}'.format(parts.path) logger.critical(msg) continue except ValueError as e: logger.critical("Not reloading: " + str(e)) continue logger.info("DHCPv6 server restarting after configuration change") running = False stopping = False continue elif signal_nr[0] in (signal.SIGINT, signal.SIGTERM): logger.debug("Received termination request") running = False stopping = True break elif signal_nr[0] in (signal.SIGUSR1,): # The USR1 signal is used to indicate initialisation errors in worker processes count_exception = True elif isinstance(key.fileobj, ControlSocket): # A new control connection request control_connection = key.fileobj.accept() if control_connection: # We got a connection, listen to events sel.register(control_connection, selectors.EVENT_READ) elif isinstance(key.fileobj, ControlConnection): # Let the connection handle received data control_connection = key.fileobj commands = control_connection.get_commands() for command in commands: if command: logger.debug("Received control command '{}'".format(command)) if command == 'help': control_connection.send("Recognised commands:") control_connection.send(" help") control_connection.send(" stats") control_connection.send(" stats-json") control_connection.send(" reload") control_connection.send(" shutdown") control_connection.send(" quit") control_connection.acknowledge() elif command == 'stats': control_connection.send(str(statistics)) control_connection.acknowledge() elif command == 'stats-json': control_connection.send(json.dumps(statistics.export())) control_connection.acknowledge() elif command == 'reload': # Simulate a SIGHUP to reload os.write(signal_w, bytes([signal.SIGHUP])) control_connection.acknowledge('Reloading') elif command == 'shutdown': # Simulate a SIGTERM to reload control_connection.acknowledge('Shutting down') control_connection.close() sel.unregister(control_connection) os.write(signal_w, bytes([signal.SIGTERM])) break elif command == 'quit' or command is None: if command == 'quit': # User nicely signing off control_connection.acknowledge() control_connection.close() sel.unregister(control_connection) break else: logger.warning("Rejecting unknown control command '{}'".format(command)) control_connection.reject() except Exception as e: # Catch-all exception handler logger.exception("Caught unexpected exception {!r}".format(e)) count_exception = True if count_exception: now = time.monotonic() # Add new exception time to the history exception_history.append(now) # Remove exceptions outside the window from the history cutoff = now - config.exception_window while exception_history and exception_history[0] < cutoff: exception_history.pop(0) # Did we receive too many exceptions shortly after each other? if len(exception_history) > config.max_exceptions: logger.critical("Received more than {} exceptions in {} seconds, " "exiting".format(config.max_exceptions, config.exception_window)) running = False stopping = True pool.close() pool.join() # Regain root so we can delete the PID file and control socket restore_privileges() try: if pid_filename: os.unlink(pid_filename) logger.info("Removing PID-file {}".format(pid_filename)) except OSError: pass try: if control_socket: os.unlink(control_socket.socket_path) logger.info("Removing control socket {}".format(control_socket.socket_path)) except OSError: pass logger.info("Shutting down Python DHCPv6 server v{}".format(dhcpkit.__version__)) return 0
[docs]def run() -> int: """ Run the main program and handle exceptions :return: The program exit code """ try: # Run the server return main(sys.argv[1:]) except Exception as e: logger.exception("Error: {}".format(e)) return 1
if __name__ == '__main__': sys.exit(run())