"""
Statistics about the server in shared memory
"""
from collections import OrderedDict
from ctypes import c_uint64
from multiprocessing import Value
from multiprocessing.sharedctypes import Synchronized
from dhcpkit.ipv6.message_registry import message_registry
from dhcpkit.ipv6.messages import ClientServerMessage
from dhcpkit.ipv6.server.transaction_bundle import TransactionBundle
from dhcpkit.utils import camelcase_to_underscore
from typing import Dict, Hashable, Iterable, List
[docs]def create_update_method(counter_name):
"""
Create a counting method for a simple counter on the Statistics class
:param counter_name: The name of the counter to update
:return: The generated method
"""
def count_method(self):
"""
Call the counting method on all statistics objects
"""
counter = getattr(self, counter_name)
with counter.get_lock():
counter.value += 1
return count_method
[docs]def create_update_dict_method(counter_name):
"""
Create a counting method for a counter in a dictionary on the Statistics class
:param counter_name: The name of the counter to update
:return: The generated method
"""
def count_method(self, key):
"""
Update the counter for the given key
"""
counter_dict = getattr(self, counter_name)
if key in counter_dict:
counter = counter_dict[key]
with counter.get_lock():
counter.value += 1
return count_method
[docs]def create_count_method(method_name: str):
"""
Create a counting method for the StatisticsSet class
:param method_name: The name of the method to call on Statistics objects
:return: The generated method
"""
def count_method(self):
"""
Call the counting method on all statistics objects
"""
for stats in self.statistics_set:
method = getattr(stats, method_name)
method()
return count_method
[docs]def create_count_dict_method(method_name: str):
"""
Create a counting method for the StatisticsSet class
:param method_name: The name of the method to call on Statistics objects
:return: The generated method
"""
def count_method(self, key):
"""
Call the counting method on all statistics objects
"""
for stats in self.statistics_set:
method = getattr(stats, method_name)
method(key)
return count_method
[docs]class Statistics:
"""
A set of statistics about DHCPv6
:type incoming_packets: Synchronized
:type outgoing_packets: Synchronized
:type unparsable_packets: Synchronized
:type handling_errors: Synchronized
:type for_other_server: Synchronized
:type do_not_respond: Synchronized
:type use_multicast: Synchronized
:type unknown_query_type: Synchronized
:type malformed_query: Synchronized
:type not_allowed: Synchronized
:type other_error: Synchronized
:type messages_in: Dict[int, Synchronized]
:type messages_out: Dict[int, Synchronized]
"""
def __init__(self):
# Packet counts
self.incoming_packets = Value(c_uint64)
self.outgoing_packets = Value(c_uint64)
# Errors
self.unparsable_packets = Value(c_uint64)
self.handling_errors = Value(c_uint64)
# Special replies
self.for_other_server = Value(c_uint64)
self.do_not_respond = Value(c_uint64)
self.use_multicast = Value(c_uint64)
self.unknown_query_type = Value(c_uint64)
self.malformed_query = Value(c_uint64)
self.not_allowed = Value(c_uint64)
self.other_error = Value(c_uint64)
# Counters per message type
self.messages_in = OrderedDict()
self.messages_out = OrderedDict()
# Which message types do we know about?
message_registry_keys = list(message_registry.keys())
message_registry_keys.sort()
for key in message_registry_keys:
message_class = message_registry[key]
if message_class.from_client_to_server and issubclass(message_class, ClientServerMessage):
self.messages_in[message_class.message_type] = Value(c_uint64)
if message_class.from_server_to_client and issubclass(message_class, ClientServerMessage):
self.messages_out[message_class.message_type] = Value(c_uint64)
def __str__(self):
lines = [
"Packets",
"- Incoming packets: {}".format(self.incoming_packets.value),
"- Outgoing packets: {}".format(self.outgoing_packets.value),
"Errors",
"- Unparsable packets: {}".format(self.unparsable_packets.value),
"- Handling errors: {}".format(self.handling_errors.value),
"Special replies",
"- For other server: {}".format(self.for_other_server.value),
"- Do not respond: {}".format(self.do_not_respond.value),
"- Use multicast: {}".format(self.use_multicast.value),
"- Unknown query type: {}".format(self.unknown_query_type.value),
"- Malformed query: {}".format(self.malformed_query.value),
"- Not allowed: {}".format(self.not_allowed.value),
"- Other error: {}".format(self.other_error.value),
"Incoming messages",
]
for message_type, counter in self.messages_in.items():
message_type_name = message_registry[message_type].__name__
if message_type_name.endswith('Message'):
message_type_name = message_type_name[:-7]
lines += ['- {}: {}'.format(message_type_name, counter.value)]
lines += ['Outgoing messages']
for message_type, counter in self.messages_out.items():
message_type_name = message_registry[message_type].__name__
if message_type_name.endswith('Message'):
message_type_name = message_type_name[:-7]
lines += ['- {}: {}'.format(message_type_name, counter.value)]
return '\n'.join(lines)
[docs] def export(self) -> Dict[str, int]:
"""
Export the counters
:return: The counters in a processable format
"""
out = OrderedDict()
out['incoming_packets'] = self.incoming_packets.value
out['outgoing_packets'] = self.outgoing_packets.value
out['unparsable_packets'] = self.unparsable_packets.value
out['handling_errors'] = self.handling_errors.value
out['for_other_server'] = self.for_other_server.value
out['do_not_respond'] = self.do_not_respond.value
out['use_multicast'] = self.use_multicast.value
out['unknown_query_type'] = self.unknown_query_type.value
out['malformed_query'] = self.malformed_query.value
out['not_allowed'] = self.not_allowed.value
out['other_error'] = self.other_error.value
out['messages_in'] = OrderedDict()
for message_type, counter in self.messages_in.items():
message_type_name = message_registry[message_type].__name__
message_type_name = camelcase_to_underscore(message_type_name)
if message_type_name.endswith('_message'):
message_type_name = message_type_name[:-8]
out['messages_in'][message_type_name] = counter.value
out['messages_out'] = OrderedDict()
for message_type, counter in self.messages_out.items():
message_type_name = message_registry[message_type].__name__
message_type_name = camelcase_to_underscore(message_type_name)
if message_type_name.endswith('_message'):
message_type_name = message_type_name[:-8]
out['messages_out'][message_type_name] = counter.value
return out
count_incoming_packet = create_update_method('incoming_packets')
count_outgoing_packet = create_update_method('outgoing_packets')
count_unparsable_packet = create_update_method('unparsable_packets')
count_handling_error = create_update_method('handling_errors')
count_for_other_server = create_update_method('for_other_server')
count_do_not_respond = create_update_method('do_not_respond')
count_use_multicast = create_update_method('use_multicast')
count_unknown_query_type = create_update_method('unknown_query_type')
count_malformed_query = create_update_method('malformed_query')
count_not_allowed = create_update_method('not_allowed')
count_other_error = create_update_method('other_error')
count_message_in = create_update_dict_method('messages_in')
count_message_out = create_update_dict_method('messages_out')
[docs]class StatisticsSet:
"""
A set of statistics objects that are updated together. The metaclass will create all methods for us.
"""
def __init__(self, statistics_set: Iterable[Statistics] = None):
self.statistics_set = set(statistics_set or [])
count_incoming_packet = create_count_method('count_incoming_packet')
count_outgoing_packet = create_count_method('count_outgoing_packet')
count_unparsable_packet = create_count_method('count_unparsable_packet')
count_handling_error = create_count_method('count_handling_error')
count_for_other_server = create_count_method('count_for_other_server')
count_do_not_respond = create_count_method('count_do_not_respond')
count_use_multicast = create_count_method('count_use_multicast')
count_unknown_query_type = create_count_method('count_unknown_query_type')
count_malformed_query = create_count_method('count_malformed_query')
count_not_allowed = create_count_method('count_not_allowed')
count_other_error = create_count_method('count_other_error')
count_message_in = create_count_dict_method('count_message_in')
count_message_out = create_count_dict_method('count_message_out')
[docs]class ServerStatistics:
"""
A set of statistics about the DHCPv6 server
:type global_stats: Statistics
:type interface_stats: Dict[str, Statistics]
:type subnet_stats: Dict[IPv6Network, Statistics]
:type relay_stats: Dict[IPv6Address, Statistics]
"""
def __init__(self):
self.global_stats = Statistics()
# On-demand categories
self.interface_stats = {}
self.subnet_stats = {}
self.relay_stats = {}
[docs] def set_categories(self, category_settings):
"""
Create space for the given interfaces
:param category_settings: Configuration setting for categories
"""
if not category_settings:
return
def update_categories(container: Dict[Hashable, Statistics], categories: Iterable[Hashable]):
"""
Update a category dictionary based on the provided list of wanted categories
:param container: The container to update
:param categories: The list of wanted categories
"""
# Keep track of existing categories that we don't want anymore
remaining = set(container.keys())
# Create categories
for key in categories:
if key not in container:
container[key] = Statistics()
if key in remaining:
remaining.remove(key)
# Remove unwanted categories
for key in remaining:
del container[key]
update_categories(self.interface_stats, category_settings.interfaces)
update_categories(self.subnet_stats, category_settings.subnets)
update_categories(self.relay_stats, category_settings.relays)
[docs] def get_update_set(self, interface_name: str = None, bundle: TransactionBundle = None) -> StatisticsSet:
"""
Return all statistics objects that need to be updated.
:param interface_name: The name of the interface that we received the packet on
:param bundle: The transaction bundle to base the selection on
:return: The set to call count methods on
"""
stats_set = [self.global_stats]
if interface_name and interface_name in self.interface_stats:
stats_set.append(self.interface_stats[interface_name])
if bundle:
link_address = bundle.link_address
for subnet, stats in self.subnet_stats.items():
if link_address in subnet:
stats_set.append(stats)
relays = bundle.relays
for address, stats in self.relay_stats.items():
if address in relays:
stats_set.append(stats)
return StatisticsSet(stats_set)
def __str__(self):
lines = ['Global']
lines += [('- ' if not line.startswith('- ') else ' ') + line
for line in str(self.global_stats).split('\n')]
def get_category_lines(type_name: str, category_data: Dict[Hashable, Statistics]) -> List[str]:
"""
Get lines for a category
:param type_name: A descriptive type name for the categories
:param category_data: A dictionary for categories
:return: The lines for the provided category
"""
sub_lines = []
keys = list(category_data.keys())
keys.sort()
for name in keys:
sub_lines += ['', '{} {}'.format(type_name, name)]
sub_lines += [('- ' if not line.startswith('- ') else ' ') + line
for line in str(category_data[name]).split('\n')]
return sub_lines
lines += get_category_lines('Interface', self.interface_stats)
lines += get_category_lines('Subnet', self.subnet_stats)
lines += get_category_lines('Relay', self.relay_stats)
return '\n'.join(lines)
[docs] def export(self) -> Dict[str, int]:
"""
Export the counters
:return: The counters in a processable format
"""
out = OrderedDict()
out['global'] = self.global_stats.export()
def get_category_data(category_data: Dict[Hashable, Statistics]) -> Dict[str, dict]:
"""
Get data for a category
:param category_data: A dictionary for categories
:return: The data for the provided categories
"""
data = OrderedDict()
keys = list(category_data.keys())
keys.sort()
for name in keys:
data[str(name)] = category_data[name].export()
return data
out['interfaces'] = get_category_data(self.interface_stats)
out['subnets'] = get_category_data(self.subnet_stats)
out['relays'] = get_category_data(self.relay_stats)
return out