Source code for dhcpkit.ipv6.extensions.map

"""
Implementation of MAP options as specified in :rfc:`7598`.
"""
import math
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from struct import pack, unpack
from typing import Iterable, List, Optional, Type, Union

from dhcpkit.ipv6.messages import AdvertiseMessage, ConfirmMessage, RebindMessage, ReleaseMessage, RenewMessage, \
    ReplyMessage, RequestMessage, SolicitMessage
from dhcpkit.ipv6.options import Option, SomeOption

OPTION_S46_RULE = 89
OPTION_S46_BR = 90
OPTION_S46_DMR = 91
OPTION_S46_V4V6BIND = 92
OPTION_S46_PORTPARAMS = 93
OPTION_S46_CONT_MAPE = 94
OPTION_S46_CONT_MAPT = 95
OPTION_S46_CONT_LW = 96


[docs]class S46RuleOption(Option): """ :rfc:`7598#section-4.1` Figure 1 shows the format of the S46 Rule option (OPTION_S46_RULE) used for conveying the Basic Mapping Rule (BMR) and Forwarding Mapping Rule (FMR). This option follows behavior described in Sections 17.1.1 and 18.1.1 of [RFC3315]. Clients can send those options, encapsulated in their respective container options, with specific values as hints for the server. See Section 5 for details. Depending on the server configuration and policy, it may accept or ignore the hints. Clients MUST be able to process received values that are different than the hints it sent earlier. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_RULE | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | flags | ea-len | prefix4-len | ipv4-prefix | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | (continued) | prefix6-len | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ipv6-prefix | | (variable length) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . S46_RULE-options . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Figure 1: S46 Rule Option option-code OPTION_S46_RULE (89) option-length length of the option, excluding option-code and option-length fields, including length of all encapsulated options; expressed in octets. flags 8 bits long; carries flags applicable to the rule. The meanings of the specific bits are explained in Figure 2. ea-len 8 bits long; specifies the Embedded Address (EA) bit length. Allowed values range from 0 to 48. prefix4-len 8 bits long; expresses the prefix length of the Rule IPv4 prefix specified in the ipv4-prefix field. Allowed values range from 0 to 32. ipv4-prefix a fixed-length 32-bit field that specifies the IPv4 prefix for the S46 rule. The bits in the prefix after prefix4-len number of bits are reserved and MUST be initialized to zero by the sender and ignored by the receiver. prefix6-len 8 bits long; expresses the length of the Rule IPv6 prefix specified in the ipv6-prefix field. Allowed values range from 0 to 128. ipv6-prefix a variable-length field that specifies the IPv6 domain prefix for the S46 rule. The field is padded on the right with zero bits up to the nearest octet boundary when prefix6-len is not evenly divisible by 8. S46_RULE-options a variable-length field that may contain zero or more options that specify additional parameters for this S46 rule. This document specifies one such option: OPTION_S46_PORTPARAMS. The format of the S46 Rule Flags field is: .. code-block:: none 0 1 2 3 4 5 6 7 +-+-+-+-+-+-+-+-+ |Reserved |F| +-+-+-+-+-+-+-+-+ Figure 2: S46 Rule Flags Reserved 7 bits; reserved for future use as flags. F-flag 1-bit field that specifies whether the rule is to be used for forwarding (FMR). If set, this rule is used as an FMR; if not set, this rule is a BMR only and MUST NOT be used for forwarding. Note: A BMR can also be used as an FMR for forwarding if the F-flag is set. The BMR is determined by a longest-prefix match of the Rule IPv6 prefix against the End-user IPv6 prefix(es). It is expected that in a typical mesh deployment scenario there will be a single BMR, which could also be designated as an FMR using the F-flag. """ option_type = OPTION_S46_RULE def __init__(self, flags: int = 0, ea_len: int = 0, ipv4_prefix: IPv4Network = None, ipv6_prefix: IPv6Network = None, options: Iterable[Option] = None): self.flags = flags self.ea_len = ea_len self.ipv4_prefix = ipv4_prefix or IPv4Network('0.0.0.0/0') self.ipv6_prefix = ipv6_prefix or IPv6Network('::/0') self.options = list(options or []) @property def fmr(self): """ Extract the F flag :return: Whether the F flag is set """ return bool(self.flags & 1) @fmr.setter def fmr(self, value: bool): """ Set/unset the F flag :param value: The new value of the F flag """ if value: self.flags |= 1 else: self.flags &= ~1
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.flags, int) or not (0 <= self.flags < 2 ** 8): raise ValueError("Flags must be an unsigned 8 bit integer") if not isinstance(self.ea_len, int) or not (0 <= self.ea_len <= 48): raise ValueError("EA-len value must be an integer in range from 0 to 48") if not isinstance(self.ipv4_prefix, IPv4Network): raise ValueError("IPv4 prefix must be an IPv4Network") if not isinstance(self.ipv6_prefix, IPv6Network): raise ValueError("IPv6 prefix must be an IPv6Network") # Check if all options are allowed self.validate_contains(self.options) for option in self.options: option.validate()
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=8) header_offset = my_offset # Flags self.flags = buffer[offset + my_offset] my_offset += 1 # EA-Len self.ea_len = buffer[offset + my_offset] my_offset += 1 # IPv4 prefix ipv4_prefix_length = buffer[offset + my_offset] my_offset += 1 if not (0 <= ipv4_prefix_length <= 32): raise ValueError("IPv4 prefix length must be in range from 0 to 32") ipv4_address = IPv4Address(buffer[offset + my_offset:offset + my_offset + 4]) my_offset += 4 # Combine address and prefix length into prefix self.ipv4_prefix = IPv4Network('{!s}/{:d}'.format(ipv4_address, ipv4_prefix_length), strict=False) # IPv6 prefix ipv6_prefix_length = buffer[offset + my_offset] my_offset += 1 if not (0 <= ipv6_prefix_length <= 128): raise ValueError("IPv6 prefix length must be in range from 0 to 128") included_octets = math.ceil(ipv6_prefix_length / 8) ipv6_address = IPv6Address(buffer[offset + my_offset:offset + my_offset + included_octets].ljust(16, b'\x00')) my_offset += included_octets self.ipv6_prefix = IPv6Network('{!s}/{:d}'.format(ipv6_address, ipv6_prefix_length), strict=False) # Parse the options self.options = [] max_offset = option_len + header_offset # The option_len field counts bytes *after* the header fields while max_offset > my_offset: used_buffer, option = Option.parse(buffer, offset=offset + my_offset) self.options.append(option) my_offset += used_buffer if my_offset != max_offset: raise ValueError('Option length does not match the combined length of the parsed options') return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ # Store the minimal amount of octets for the IPv6 prefix included_octets = math.ceil(self.ipv6_prefix.prefixlen / 8) ipv6_address_bytes = self.ipv6_prefix.network_address.packed[:included_octets] # Store the options options_buffer = bytearray() for option in self.options: options_buffer.extend(option.save()) buffer = bytearray() buffer.extend(pack('!HHBBB', self.option_type, len(options_buffer) + included_octets + 8, self.flags, self.ea_len, self.ipv4_prefix.prefixlen)) buffer.extend(self.ipv4_prefix.network_address.packed) buffer.append(self.ipv6_prefix.prefixlen) buffer.extend(ipv6_address_bytes) buffer.extend(options_buffer) return buffer
[docs] def get_options_of_type(self, *args: Type[SomeOption]) -> List[SomeOption]: """ Get all options that are subclasses of the given class. :param args: The classes to look for :returns: The list of options """ classes = tuple(args) # noinspection PyTypeChecker return [option for option in self.options if isinstance(option, classes)]
[docs] def get_option_of_type(self, *args: Type[SomeOption]) -> Optional[SomeOption]: """ Get the first option that is a subclass of the given class. :param args: The classes to look for :returns: The option or None """ classes = tuple(args) for option in self.options: if isinstance(option, classes): # noinspection PyTypeChecker return option
[docs]class S46BROption(Option): """ :rfc:`7598#section-4.2` The S46 BR option (OPTION_S46_BR) is used to convey the IPv6 address of the Border Relay. Figure 3 shows the format of the OPTION_S46_BR option. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_BR | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | br-ipv6-address | | | | | | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Figure 3: S46 BR Option option-code OPTION_S46_BR (90) option-length 16 br-ipv6-address a fixed-length field of 16 octets that specifies the IPv6 address for the S46 BR. BR redundancy can be implemented by using an anycast address for the BR IPv6 address. Multiple OPTION_S46_BR options MAY be included in the container; this document does not further explore the use of multiple BR IPv6 addresses. """ option_type = OPTION_S46_BR def __init__(self, br_address: IPv6Address = None): self.br_address = br_address
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.br_address, IPv6Address): raise ValueError("BR address must be an IPv6Address")
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=16, max_length=16) self.br_address = IPv6Address(buffer[offset + my_offset:offset + my_offset + 16]) my_offset += 16 return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ buffer = bytearray() buffer.extend(pack('!HH', self.option_type, 16)) buffer.extend(self.br_address.packed) return buffer
[docs]class S46DMROption(Option): """ :rfc:`7598#section-4.3` The S46 DMR option (OPTION_S46_DMR) is used to convey values for the Default Mapping Rule (DMR). Figure 4 shows the format of the OPTION_S46_DMR option used for conveying a DMR. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_DMR | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |dmr-prefix6-len| dmr-ipv6-prefix | +-+-+-+-+-+-+-+-+ (variable length) | . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Figure 4: S46 DMR Option option-code OPTION_S46_DMR (91) option-length 1 + length of dmr-ipv6-prefix specified in octets. dmr-prefix6-len 8 bits long; expresses the bitmask length of the IPv6 prefix specified in the dmr-ipv6-prefix field. Allowed values range from 0 to 128. dmr-ipv6-prefix a variable-length field specifying the IPv6 prefix or address for the BR. This field is right-padded with zeros to the nearest octet boundary when dmr-prefix6-len is not divisible by 8. """ option_type = OPTION_S46_DMR def __init__(self, dmr_prefix: IPv6Network = None): self.dmr_prefix = dmr_prefix
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.dmr_prefix, IPv6Network): raise ValueError("DMR prefix must be an IPv6Network")
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=1, max_length=17) header_offset = my_offset # IPv6 prefix ipv6_prefix_length = buffer[offset + my_offset] my_offset += 1 if not (0 <= ipv6_prefix_length <= 128): raise ValueError("IPv6 prefix length must be in range from 0 to 128") included_octets = math.ceil(ipv6_prefix_length / 8) ipv6_address = IPv6Address(buffer[offset + my_offset:offset + my_offset + included_octets].ljust(16, b'\x00')) my_offset += included_octets self.dmr_prefix = IPv6Network('{!s}/{:d}'.format(ipv6_address, ipv6_prefix_length), strict=False) max_offset = option_len + header_offset # The option_len field counts bytes *after* the header fields if my_offset != max_offset: raise ValueError('Option length does not match the combined length of the parsed options') return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ # Store the minimal amount of octets for the IPv6 prefix included_octets = math.ceil(self.dmr_prefix.prefixlen / 8) ipv6_address_bytes = self.dmr_prefix.network_address.packed[:included_octets] buffer = bytearray() buffer.extend(pack('!HHB', self.option_type, 1 + included_octets, self.dmr_prefix.prefixlen)) buffer.extend(ipv6_address_bytes) return buffer
[docs]class S46V4V6BindingOption(Option): """ :rfc:`7598#section-4.4` The S46 IPv4/IPv6 Address Binding option (OPTION_S46_V4V6BIND) MAY be used to specify the full or shared IPv4 address of the CE. The IPv6 prefix field is used by the CE to identify the correct prefix to use for the tunnel source. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_V4V6BIND | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ipv4-address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |bindprefix6-len| bind-ipv6-prefix | +-+-+-+-+-+-+-+-+ (variable length) | . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . S46_V4V6BIND-options . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Figure 5: S46 IPv4/IPv6 Address Binding Option option-code OPTION_S46_V4V6BIND (92) option-length length of the option, excluding option-code and option-length fields, including length of all encapsulated options; expressed in octets. ipv4-address a fixed-length field of 4 octets specifying an IPv4 address. bindprefix6-len 8 bits long; expresses the bitmask length of the IPv6 prefix specified in the bind-ipv6-prefix field. Allowed values range from 0 to 128. bind-ipv6-prefix a variable-length field specifying the IPv6 prefix or address for the S46 CE. This field is right-padded with zeros to the nearest octet boundary when bindprefix6-len is not divisible by 8. S46_V4V6BIND-options a variable-length field that may contain zero or more options that specify additional parameters. This document specifies one such option: OPTION_S46_PORTPARAMS. """ option_type = OPTION_S46_V4V6BIND def __init__(self, ipv4_address: IPv4Address = None, ipv6_prefix: IPv6Network = None, options: Iterable[Option] = None): self.ipv4_address = ipv4_address or IPv4Address('0.0.0.0') self.ipv6_prefix = ipv6_prefix or IPv6Network('::/0') self.options = list(options or [])
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.ipv4_address, IPv4Address): raise ValueError("IPv4 address must be an IPv4Address") if not isinstance(self.ipv6_prefix, IPv6Network): raise ValueError("IPv6 prefix must be an IPv6Network") # Check if all options are allowed self.validate_contains(self.options) for option in self.options: option.validate()
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=5) header_offset = my_offset # IPv4 address self.ipv4_address = IPv4Address(buffer[offset + my_offset:offset + my_offset + 4]) my_offset += 4 # IPv6 prefix ipv6_prefix_length = buffer[offset + my_offset] my_offset += 1 if not (0 <= ipv6_prefix_length <= 128): raise ValueError("IPv6 prefix length must be in range from 0 to 128") included_octets = math.ceil(ipv6_prefix_length / 8) ipv6_address = IPv6Address(buffer[offset + my_offset:offset + my_offset + included_octets].ljust(16, b'\x00')) my_offset += included_octets self.ipv6_prefix = IPv6Network('{!s}/{:d}'.format(ipv6_address, ipv6_prefix_length), strict=False) # Parse the options self.options = [] max_offset = option_len + header_offset # The option_len field counts bytes *after* the header fields while max_offset > my_offset: used_buffer, option = Option.parse(buffer, offset=offset + my_offset) self.options.append(option) my_offset += used_buffer if my_offset != max_offset: raise ValueError('Option length does not match the combined length of the parsed options') return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ # Store the minimal amount of octets for the IPv6 prefix included_octets = math.ceil(self.ipv6_prefix.prefixlen / 8) ipv6_address_bytes = self.ipv6_prefix.network_address.packed[:included_octets] # Store the options options_buffer = bytearray() for option in self.options: options_buffer.extend(option.save()) buffer = bytearray() buffer.extend(pack('!HH', self.option_type, len(options_buffer) + included_octets + 5)) buffer.extend(self.ipv4_address.packed) buffer.append(self.ipv6_prefix.prefixlen) buffer.extend(ipv6_address_bytes) buffer.extend(options_buffer) return buffer
[docs] def get_options_of_type(self, *args: Type[SomeOption]) -> List[SomeOption]: """ Get all options that are subclasses of the given class. :param args: The classes to look for :returns: The list of options """ classes = tuple(args) # noinspection PyTypeChecker return [option for option in self.options if isinstance(option, classes)]
[docs] def get_option_of_type(self, *args: Type[SomeOption]) -> Optional[SomeOption]: """ Get the first option that is a subclass of the given class. :param args: The classes to look for :returns: The option or None """ classes = tuple(args) for option in self.options: if isinstance(option, classes): # noinspection PyTypeChecker return option
[docs]class S46PortParametersOption(Option): """ :rfc:`7598#section-4.5` The S46 Port Parameters option (OPTION_S46_PORTPARAMS) specifies optional port set information that MAY be provided to CEs. See Section 5.1 of [RFC7597] for a description of the MAP algorithm and detailed explanation of all of the parameters. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_PORTPARAMS | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | offset | PSID-len | PSID | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Figure 6: S46 Port Parameters Option option-code OPTION_S46_PORTPARAMS (93) option-length 4 offset Port Set Identifier (PSID) offset. 8 bits long; specifies the numeric value for the S46 algorithm's excluded port range/offset bits (a-bits), as per Section 5.1 of [RFC7597]. Allowed values are between 0 and 15. Default values for this field are specific to the softwire mechanism being implemented and are defined in the relevant specification document. PSID-len 8 bits long; specifies the number of significant bits in the PSID field (also known as 'k'). When set to 0, the PSID field is to be ignored. After the first 'a' bits, there are k bits in the port number representing the value of the PSID. Consequently, the address-sharing ratio would be 2^k. PSID 16 bits long. The PSID value algorithmically identifies a set of ports assigned to a CE. The first k bits on the left of this field contain the PSID binary value. The remaining (16 - k) bits on the right are padding zeros. When receiving the OPTION_S46_PORTPARAMS option with an explicit PSID, the client MUST use this explicit PSID when configuring its softwire interface. The OPTION_S46_PORTPARAMS option with an explicit PSID MUST be discarded if the S46 CE isn't configured with a full IPv4 address (e.g., IPv4 prefix). The OPTION_S46_PORTPARAMS option is contained within an OPTION_S46_RULE option or an OPTION_S46_V4V6BIND option. """ option_type = OPTION_S46_PORTPARAMS def __init__(self, offset: int = 0, psid_len: int = 0, psid: int = 0): self.offset = offset self.psid_len = psid_len self.psid = psid
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ if not isinstance(self.offset, int) or not (0 <= self.offset <= 15): raise ValueError("Offset must be an unsigned 4 bit integer") if not isinstance(self.psid_len, int) or not (0 <= self.psid_len <= 16): raise ValueError("PSID length must be an integer in range from 0 to 16") if self.offset + self.psid_len > 16: raise ValueError("Offset and PSID length together must be 16 or less") if not isinstance(self.psid, int) or not (0 <= self.psid < 2 ** self.psid_len): raise ValueError("PSID must be an unsigned {} bit integer".format(self.psid_len))
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=4, max_length=4) self.offset, self.psid_len, raw_psid = unpack('!BBH', buffer[offset + my_offset:offset + my_offset + 4]) my_offset += 4 # Convert left-aligned bits to an integer self.psid = raw_psid >> (16 - self.psid_len) return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ # Convert integer to left-aligned bits raw_psid = self.psid << (16 - self.psid_len) buffer = bytearray() buffer.extend(pack('!HHBBH', self.option_type, 4, self.offset, self.psid_len, raw_psid)) return buffer
[docs]class S46ContainerOption(Option): """ Common code for MAP-E, MAP-T and LW4over6 containers """ option_type = 0 def __init__(self, options: Iterable[Option] = None): self.options = list(options or [])
[docs] def validate(self): """ Validate that the contents of this object conform to protocol specs. """ # Check if all options are allowed self.validate_contains(self.options) for option in self.options: option.validate()
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int: """ Load the internal state of this object from the given buffer. The buffer may contain more data after the structured element is parsed. This data is ignored. :param buffer: The buffer to read data from :param offset: The offset in the buffer where to start reading :param length: The amount of data we are allowed to read from the buffer :return: The number of bytes used from the buffer """ my_offset, option_len = self.parse_option_header(buffer, offset, length, min_length=5) # Parse the options self.options = [] max_offset = option_len + my_offset while max_offset > my_offset: used_buffer, option = Option.parse(buffer, offset=offset + my_offset) self.options.append(option) my_offset += used_buffer if my_offset != max_offset: raise ValueError('Option length does not match the combined length of the parsed options') return my_offset
[docs] def save(self) -> Union[bytes, bytearray]: """ Save the internal state of this object as a buffer. :return: The buffer with the data from this element """ # Store the options options_buffer = bytearray() for option in self.options: options_buffer.extend(option.save()) buffer = bytearray() buffer.extend(pack('!HH', self.option_type, len(options_buffer))) buffer.extend(options_buffer) return buffer
[docs] def get_options_of_type(self, *args: Type[SomeOption]) -> List[SomeOption]: """ Get all options that are subclasses of the given class. :param args: The classes to look for :returns: The list of options """ classes = tuple(args) # noinspection PyTypeChecker return [option for option in self.options if isinstance(option, classes)]
[docs] def get_option_of_type(self, *args: Type[SomeOption]) -> Optional[SomeOption]: """ Get the first option that is a subclass of the given class. :param args: The classes to look for :returns: The option or None """ classes = tuple(args) for option in self.options: if isinstance(option, classes): # noinspection PyTypeChecker return option
[docs]class S46MapEContainerOption(S46ContainerOption): """ :rfc:`7598#section-5.1` The S46 MAP-E Container option (OPTION_S46_CONT_MAPE) specifies the container used to group all rules and optional port parameters for a specified domain. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_CONT_MAPE | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . encapsulated-options (variable length) . . . +---------------------------------------------------------------+ Figure 7: S46 MAP-E Container Option option-code OPTION_S46_CONT_MAPE (94) option-length length of encapsulated options, expressed in octets. encapsulated-options options associated with this Softwire46 MAP-E domain. The encapsulated-options field conveys options specific to the OPTION_S46_CONT_MAPE option. Currently, there are two encapsulated options specified: OPTION_S46_RULE and OPTION_S46_BR. There MUST be at least one OPTION_S46_RULE option and at least one OPTION_S46_BR option. Other options applicable to a domain may be defined in the future. A DHCPv6 message MAY include multiple OPTION_S46_CONT_MAPE options (representing multiple domains). """ option_type = OPTION_S46_CONT_MAPE
[docs]class S46MapTContainerOption(S46ContainerOption): """ :rfc:`7598#section-5.2` The S46 MAP-T Container option (OPTION_S46_CONT_MAPT) specifies the container used to group all rules and optional port parameters for a specified domain. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_CONT_MAPT | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . encapsulated-options (variable length) . . . +---------------------------------------------------------------+ Figure 8: S46 MAP-T Container Option option-code OPTION_S46_CONT_MAPT (95) option-length length of encapsulated options, expressed in octets. encapsulated-options options associated with this Softwire46 MAP-T domain. The encapsulated-options field conveys options specific to the OPTION_S46_CONT_MAPT option. Currently, there are two options specified: the OPTION_S46_RULE and OPTION_S46_DMR options. There MUST be at least one OPTION_S46_RULE option and exactly one OPTION_S46_DMR option. """ option_type = OPTION_S46_CONT_MAPT
[docs]class S46LWContainerOption(S46ContainerOption): """ :rfc:`7598#section-5.3` The S46 Lightweight 4over6 Container option (OPTION_S46_CONT_LW) specifies the container used to group all rules and optional port parameters for a specified domain. .. code-block:: none 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTION_S46_CONT_LW | option-length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + encapsulated-options (variable length) . . . +---------------------------------------------------------------+ Figure 9: S46 Lightweight 4over6 Container Option option-code OPTION_S46_CONT_LW (96) option-length length of encapsulated options, expressed in octets. encapsulated-options options associated with this Softwire46 Lightweight 4over6 domain. The encapsulated-options field conveys options specific to the OPTION_S46_CONT_LW option. Currently, there are two options specified: OPTION_S46_V4V6BIND and OPTION_S46_BR. There MUST be at most one OPTION_S46_V4V6BIND option and at least one OPTION_S46_BR option. """ option_type = OPTION_S46_CONT_LW
# Register where these options may occur SolicitMessage.add_may_contain(S46ContainerOption) AdvertiseMessage.add_may_contain(S46ContainerOption) RequestMessage.add_may_contain(S46ContainerOption) ConfirmMessage.add_may_contain(S46ContainerOption) RenewMessage.add_may_contain(S46ContainerOption) RebindMessage.add_may_contain(S46ContainerOption) ReleaseMessage.add_may_contain(S46ContainerOption) ReplyMessage.add_may_contain(S46ContainerOption) S46RuleOption.add_may_contain(S46PortParametersOption) S46V4V6BindingOption.add_may_contain(S46PortParametersOption) S46MapEContainerOption.add_may_contain(S46RuleOption, min_occurrence=1) S46MapEContainerOption.add_may_contain(S46BROption, min_occurrence=1) S46MapEContainerOption.add_may_contain(S46PortParametersOption) S46MapTContainerOption.add_may_contain(S46RuleOption, min_occurrence=1) S46MapTContainerOption.add_may_contain(S46DMROption, min_occurrence=1, max_occurrence=1) S46MapTContainerOption.add_may_contain(S46PortParametersOption) S46LWContainerOption.add_may_contain(S46V4V6BindingOption, min_occurrence=0, max_occurrence=1) S46LWContainerOption.add_may_contain(S46BROption, min_occurrence=1) S46LWContainerOption.add_may_contain(S46PortParametersOption)