"""
Messages that are sent over Kafka
"""
from struct import unpack, pack
from dhcpkit.ipv6.messages import Message
from dhcpkit.protocol_element import ProtocolElement
from typing import Union
# Constants for the message direction
DHCP_MESSAGE = 1
[docs]class KafkaMessage(ProtocolElement):
"""
The base class for Kafka messages.
:type message_type: int
"""
# These needs to be overwritten in subclasses
message_type = 0
[docs] @classmethod
def determine_class(cls, buffer: bytes, offset: int = 0) -> type:
"""
Return the appropriate subclass from the registry, or UnknownClientServerMessage if no subclass is registered.
:param buffer: The buffer to read data from
:param offset: The offset in the buffer where to start reading
:return: The best known class for this message data
"""
from dhcpkit_kafka.message_registry import kafka_message_registry
message_type = buffer[offset]
return kafka_message_registry.get(message_type, UnknownKafkaMessage)
[docs]class UnknownKafkaMessage(KafkaMessage):
"""
Container for raw message content for cases where we don't know how to decode the message.
:type message_data: bytes
"""
def __init__(self, message_type: int = 0, message_data: bytes = b''):
super().__init__()
self.message_type = message_type
self.message_data = message_data
[docs] def validate(self):
"""
Validate that the contents of this object conform to protocol specs.
"""
# Check if the data is bytes
if not isinstance(self.message_type, int) or not (0 <= self.message_type < 2 ** 8):
raise ValueError("Message type must be an unsigned 8 bit integer")
# Check if the data is bytes
if not isinstance(self.message_data, bytes):
raise ValueError("Message data must be a sequence of bytes")
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int:
"""
Load the internal state of this object from the given buffer. The buffer may contain more data after the
structured element is parsed. This data is ignored.
:param buffer: The buffer to read data from
:param offset: The offset in the buffer where to start reading
:param length: The amount of data we are allowed to read from the buffer
:return: The number of bytes used from the buffer
"""
my_offset = 0
# Message always begin with a message type
self.message_type = buffer[offset + my_offset]
my_offset += 1
max_length = length or (len(buffer) - offset)
message_data_len = max_length - my_offset
self.message_data = buffer[offset + my_offset:offset + my_offset + message_data_len]
my_offset += message_data_len
self.validate()
return my_offset
[docs] def save(self) -> Union[bytes, bytearray]:
"""
Save the internal state of this object as a buffer.
:return: The buffer with the data from this element
"""
self.validate()
buffer = bytearray()
buffer.append(self.message_type)
buffer.extend(self.message_data)
return buffer
[docs]class DHCPKafkaMessage(KafkaMessage):
"""
A message for publishing DHCPv6 messages over Kafka for analysis.
.. code-block:: none
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| msg-type | name-len | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ .
. server-name (variable length) .
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp-in |
| (double float) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
. .
. message-in .
. (variable length) .
. .
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp-out |
| (double float) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
. .
. message-out .
. (variable length) .
. .
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
"""
message_type = DHCP_MESSAGE
def __init__(self, server_name: str = '',
timestamp_in: Union[int, float] = 0, message_in: Message = None,
timestamp_out: Union[int, float] = 0, message_out: Message = None):
super().__init__()
self.server_name = server_name
self.timestamp_in = timestamp_in
self.message_in = message_in
self.timestamp_out = timestamp_out
self.message_out = message_out
[docs] def validate(self):
"""
Validate that the contents of this object
"""
# Check if the server name can be encoded into 255 bytes
if len(self.server_name.encode('utf-8')) > 255:
raise ValueError("The server name encoded as UTF-8 must be 255 bytes or less")
# Check if the timestamp is a signed 64-bit integer
if not isinstance(self.timestamp_in, (int, float)):
raise ValueError("Incoming timestamp must be float or integer")
# Check the messages
if self.message_in is not None and not isinstance(self.message_in, Message):
raise ValueError("Incoming message is not a valid DHCPv6 message")
# Check if the timestamp is a signed 64-bit integer
if not isinstance(self.timestamp_out, (int, float)):
raise ValueError("Outgoing timestamp must be float or integer")
if self.message_out is not None and not isinstance(self.message_out, Message):
raise ValueError("Outgoing message is not a valid DHCPv6 message")
# noinspection PyUnusedLocal
[docs] def load_from(self, buffer: bytes, offset: int = 0, length: int = None) -> int:
"""
Load the internal state of this object from the given buffer.
:param buffer: The buffer to read data from
:param offset: The offset in the buffer where to start reading
:param length: The amount of data we are allowed to read from the buffer
:return: The number of bytes used from the buffer
"""
my_offset = 0
# These message types always begin with a message type and a transaction id
message_type = buffer[offset + my_offset]
my_offset += 1
if message_type != self.message_type:
raise ValueError('The provided buffer does not contain {} data'.format(self.__class__.__name__))
# Parse the server name
server_name_length = buffer[my_offset]
my_offset += 1
self.server_name = buffer[my_offset:my_offset + server_name_length].decode('utf-8')
my_offset += server_name_length
# Read the incoming timestamp
self.timestamp_in = unpack('!d', buffer[my_offset:my_offset + 8])[0]
my_offset += 8
# Read the incoming packet
message_in_len = int(unpack('!H', buffer[my_offset:my_offset + 2])[0])
my_offset += 2
if message_in_len:
packet_length, self.message_in = Message.parse(buffer, my_offset, message_in_len)
my_offset += packet_length
else:
self.message_in = None
# Read the outgoing timestamp
self.timestamp_out = unpack('!d', buffer[my_offset:my_offset + 8])[0]
my_offset += 8
# Read the outgoing message
message_out_len = int(unpack('!H', buffer[my_offset:my_offset + 2])[0])
my_offset += 2
if message_out_len:
packet_length, self.message_out = Message.parse(buffer, my_offset, message_out_len)
my_offset += packet_length
else:
self.message_out = None
self.validate()
return my_offset
[docs] def save(self) -> Union[bytes, bytearray]:
"""
Save the internal state of this object as a buffer.
:return: The buffer with the data from this element
"""
self.validate()
buffer = bytearray()
buffer.append(self.message_type)
server_name_bytes = self.server_name.encode('utf-8')
buffer.append(len(server_name_bytes))
buffer.extend(server_name_bytes)
# Incoming message
message_bytes = self.message_in.save() if self.message_in else b''
# Incoming timestamp and message
buffer.extend(pack('!dH', self.timestamp_in, len(message_bytes)))
buffer.extend(message_bytes)
# Outgoing message
message_bytes = self.message_out.save() if self.message_out else b''
# Outgoing timestamp and message
buffer.extend(pack('!dH', self.timestamp_out, len(message_bytes)))
buffer.extend(message_bytes)
return buffer