Source code for dhcpkit.ipv6.server.filters.elapsed_time.config

"""
Filter on elapsed time indicated by the client
"""
import operator
from collections import namedtuple

from cached_property import cached_property
from dhcpkit.ipv6.options import ElapsedTimeOption
from dhcpkit.ipv6.server.filters import Filter, FilterFactory
from dhcpkit.ipv6.server.transaction_bundle import TransactionBundle
from dhcpkit.utils import camelcase_to_dash
from typing import List

TimeLimit = namedtuple('TimeLimit', ['operator', 'limit'])


[docs]class ElapsedTimeFilter(Filter): """ Filter on marks that have been placed on the incoming message """ @cached_property def filter_description(self) -> str: """ A short description of this filter for log messages. :return: The description """ simple_name = camelcase_to_dash(self.__class__.__name__) if simple_name.endswith('-filter'): simple_name = simple_name[:-7] return simple_name + ' ' + ' and '.join(["{operator} {value}".format(operator=condition.operator.__name__, value=condition.limit) for condition in self.filter_condition])
[docs] def match(self, bundle: TransactionBundle) -> bool: """ Check if the elapsed time is within the configured limits :param bundle: The transaction bundle :return: Whether the elapsed time is within the limits """ elapsed_time_option = bundle.request.get_option_of_type(ElapsedTimeOption) if not elapsed_time_option: # If there is no elapsed time then ignore the request return False for time_limit in self.filter_condition: if not time_limit.operator(elapsed_time_option.elapsed_time, time_limit.limit): # It failed the check return False # Apparently they all match return True
[docs]class ElapsedTimeFilterFactory(FilterFactory): """ Create a MarkedWithFilter """ filter_class = ElapsedTimeFilter
[docs] def validate_config_section(self): """ Check that at least one filter condition is provided, and that if multiple conditions are provided they are compatible with each other. """ if not self.less_than and not self.more_than: raise ValueError("At least one time limit ('more-than' or 'less-than') must be configured") if self.less_than and self.more_than and self.less_than <= self.more_than: raise ValueError("The timing limits of 'more-than' and 'less-than' conflict")
@property def filter_condition(self) -> List[TimeLimit]: """ The filter condition is based on the configured time limits. :return: A list of time limits """ limits = [] if self.more_than: # ElapsedTime contains the time in 1/100 of a second, but we just use second precision, so multiply limits.append(TimeLimit(operator=operator.gt, limit=self.more_than * 100)) if self.less_than: # ElapsedTime contains the time in 1/100 of a second, but we just use second precision, so multiply limits.append(TimeLimit(operator=operator.lt, limit=self.less_than * 100)) return limits