"""
Worker process for handling requests using multiprocessing.
"""
import logging
import logging.handlers
import os
import re
import signal
import sys
from multiprocessing import Queue, current_process
from dhcpkit.ipv6.messages import Message, RelayForwardMessage, RelayReplyMessage
from dhcpkit.ipv6.options import InterfaceIdOption, Option, RelayMessageOption
from dhcpkit.ipv6.server.listeners import IncomingPacketBundle, Replier
from dhcpkit.ipv6.server.message_handler import MessageHandler
from dhcpkit.ipv6.server.queue_logger import WorkerQueueHandler
from dhcpkit.ipv6.server.statistics import ServerStatistics
from dhcpkit.ipv6.server.transaction_bundle import TransactionBundle
from typing import Iterable
logger = None
""":type: logging.Logger"""
logging_handler = None
""":type: WorkerQueueHandler"""
current_message_handler = None
""":type: MessageHandler"""
shared_statistics = None
""":type: ServerStatistics"""
[docs]def setup_worker(message_handler: MessageHandler, logging_queue: Queue, lowest_log_level: int,
statistics: ServerStatistics, master_pid: int):
"""
This function will be called after a new worker process has been created. Its purpose is to set the global
variables in this specific worker process so that they can be reused across multiple requests. Otherwise we would
have to pickle them each and every time, and because they are static that would be a waste.
:param message_handler: The message handler for the incoming requests
:param logging_queue: The queue where we can deposit log messages so the main process can log them
:param lowest_log_level: The lowest log level that is going to be handled by the main process
:param statistics: Container for shared memory with statistics counters
:param master_pid: The PID of the master process, in case we have critical errors while initialising
"""
try:
# Let's shorten the process name a bit by removing everything except the "Worker-x" bit at the end
this_process = current_process()
this_process.name = re.sub(r'^.*(Worker-\d+)$', r'\1', this_process.name)
# Ignore normal signal handling
signal.signal(signal.SIGINT, lambda signum, frame: None)
signal.signal(signal.SIGTERM, lambda signum, frame: None)
signal.signal(signal.SIGHUP, lambda signum, frame: None)
# Save the logger, don't let it filter, send everything to the queue
global logger
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
global logging_handler
logging_handler = WorkerQueueHandler(logging_queue)
logging_handler.setLevel(lowest_log_level)
logger.addHandler(logging_handler)
# Save the message handler
global current_message_handler
current_message_handler = message_handler
global shared_statistics
shared_statistics = statistics
# Run the per-process startup code for the message handler and its children
message_handler.worker_init()
except Exception as e:
if logger:
logger.error("Error initialising worker: {}".format(e))
# Signal our predicament
os.kill(master_pid, signal.SIGUSR1)
# Redirect to stderr to prevent stack traces on console
sys.stderr = open(os.devnull, 'w')
raise e
[docs]def parse_incoming_request(incoming_packet: IncomingPacketBundle) -> TransactionBundle:
"""
Parse the incoming packet and add a RelayServerMessage around it containing the meta-data received from the
listener.
:param incoming_packet: The received packet
:return: The parsed message in a transaction bundle
"""
# Parse message and validate
length, incoming_message = Message.parse(incoming_packet.data)
incoming_message.validate()
# Determine the next hop count and construct useful log messages
if isinstance(incoming_message, RelayForwardMessage):
next_hop_count = incoming_message.hop_count + 1
else:
next_hop_count = 0
# Collect the relay options
relay_options = []
""":type: List[Option]"""
relay_options.extend(incoming_packet.relay_options)
relay_options.append(RelayMessageOption(relayed_message=incoming_message))
# Pretend to be an internal relay and wrap the message like a relay would
wrapped_message = RelayForwardMessage(hop_count=next_hop_count,
link_address=incoming_packet.link_address,
peer_address=incoming_packet.source_address,
options=relay_options)
# Create the transaction bundle
return TransactionBundle(incoming_message=wrapped_message,
received_over_multicast=incoming_packet.received_over_multicast,
received_over_tcp=incoming_packet.received_over_tcp,
marks=incoming_packet.marks)
[docs]def verify_response(outgoing_message: Message):
"""
generate the outgoing packet and check the RelayServerMessage around it.
:param outgoing_message: The reply message
"""
# Verify that the outer relay message makes sense
if not isinstance(outgoing_message, RelayReplyMessage):
raise ValueError("The reply has to be wrapped in a RelayReplyMessage")
reply = outgoing_message.relayed_message
if not reply:
raise ValueError("The RelayReplyMessage does not contain a message")
[docs]def get_interface_name_from_options(options: Iterable[Option]):
"""
Get the interface name from the given options and decode it as unicode
:param options: A list of options
:return: The interface name
"""
for option in options:
if isinstance(option, InterfaceIdOption):
try:
# Found: decode
return option.interface_id.decode(encoding='utf-8', errors='replace')
except UnicodeDecodeError:
pass
# Fallback
return 'unknown'
[docs]def handle_message(incoming_packet: IncomingPacketBundle, replier: Replier):
"""
Handle a single incoming request. This is supposed to be called in a separate worker thread that has been
initialised with setup_worker().
:param incoming_packet: The raw incoming request
:param replier: The object that will send replies for us
:returns: The packet to reply with and the destination
"""
# Set the log_id to make it easier to correlate log messages
logging_handler.log_id = incoming_packet.message_id
# Get the interface name from the incoming packet bundle
interface_name = get_interface_name_from_options(incoming_packet.relay_options)
# Until we parsed the packet we can only update global and interface statistics
statistics = shared_statistics.get_update_set(interface_name=interface_name)
try:
try:
# Parse the packet
bundle = parse_incoming_request(incoming_packet)
except Exception as e:
logger.error("Error while parsing request: {}".format(e))
# Count the packet on the statistics counters that we have
statistics.count_incoming_packet()
statistics.count_unparsable_packet()
return
# Now we know more: update all statistics and count the packet on all
statistics = shared_statistics.get_update_set(interface_name=interface_name, bundle=bundle)
statistics.count_incoming_packet()
try:
current_message_handler.handle(bundle, statistics)
for outgoing_message in bundle.outgoing_messages:
verify_response(outgoing_message)
statistics.count_outgoing_packet()
try:
replier.send_reply(outgoing_message)
except ValueError as e:
logger.error("Handler returned invalid message: {}".format(e))
except Exception as e:
logger.exception("Error while handling request: {}".format(e))
statistics.count_handling_error()
finally:
# Always reset the log_id when leaving
logging_handler.log_id = None