Source code for dhcpkit.ipv6.extensions.dslite

"""
Implementation of DS-Lite AFTR Name option as specified in :rfc:`6334`.
"""

from struct import pack
from typing import Union

from dhcpkit.ipv6.messages import AdvertiseMessage, InformationRequestMessage, RebindMessage, RenewMessage, \
    ReplyMessage, RequestMessage, SolicitMessage
from dhcpkit.ipv6.options import Option
from dhcpkit.utils import encode_domain, parse_domain_bytes

OPTION_AFTR_NAME = 64


[docs]class AFTRNameOption(Option): """ :rfc:`6334#section-3` The AFTR-Name option consists of option-code and option-len fields (as all DHCPv6 options have), and a variable-length tunnel-endpoint- name field containing a fully qualified domain name that refers to the AFTR to which the client MAY connect. The AFTR-Name option SHOULD NOT appear in any DHCPv6 messages other than the following: Solicit, Advertise, Request, Renew, Rebind, Information-Request, and Reply. The format of the AFTR-Name option is shown in the following figure: .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-------------------------------+-------------------------------+ | OPTION_AFTR_NAME: 64 | option-len | +-------------------------------+-------------------------------+ | | | tunnel-endpoint-name (FQDN) | | | +---------------------------------------------------------------+ OPTION_AFTR_NAME 64 option-len Length of the tunnel-endpoint-name field in octets. tunnel-endpoint-name A fully qualified domain name of the AFTR tunnel endpoint. :type fqdn: str """ option_type = OPTION_AFTR_NAME def __init__(self, fqdn: str = ''): self.fqdn = fqdn """Domain name of the AFTR tunnel endpoint"""
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.fqdn, str): raise ValueError("Tunnel endpoint name must be a string") # Let the domain encoder check for errors fqdn = encode_domain(self.fqdn) if len(fqdn) < 4: raise ValueError("The FQDN of the tunnel endpoint is too short")
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=4) header_offset = my_offset # Parse the domain label max_offset = option_len + header_offset # The option_len field counts bytes *after* the header fields name_len, self.fqdn = parse_domain_bytes(buffer, offset=offset + my_offset, length=option_len) my_offset += name_len if my_offset != max_offset: raise ValueError('Option length does not match the length of the included fqdn') return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ fqdn_buffer = encode_domain(self.fqdn) buffer = bytearray() buffer.extend(pack('!HH', self.option_type, len(fqdn_buffer))) buffer.extend(fqdn_buffer) return buffer
# Update the messages where this option may appear SolicitMessage.add_may_contain(AFTRNameOption, 0, 1) AdvertiseMessage.add_may_contain(AFTRNameOption, 0, 1) RequestMessage.add_may_contain(AFTRNameOption, 0, 1) RenewMessage.add_may_contain(AFTRNameOption, 0, 1) RebindMessage.add_may_contain(AFTRNameOption, 0, 1) InformationRequestMessage.add_may_contain(AFTRNameOption, 0, 1) ReplyMessage.add_may_contain(AFTRNameOption, 0, 1)