乐鱼电竞

  • 教育行业A股IPO第一股(股票代码 003032)

    全国咨询/投诉热线:400-618-4000

    Kafka中的HW、LEO等分别代表什么?

    更新时间:2023年10月19日11时37分 来源:乐鱼电竞 浏览次数:

    好口碑IT培训

      在Apache Kafka中,HW(High Watermark)和 LEO(Log End Offset)是与分区的复制和消息传递相关的两个关键概念。

      1.HW(High Watermark):

      High Watermark是一个分区的消息复制进度的指示器。它表示了已经成功复制到所有副本的消息的位置。HW之前的所有消息都被认为是已提交的消息,这意味着消费者可以安全地消费这些消息。HW通常是消费者组维护的偏移量的参考点。

      2.LEO(Log End Offset):

      Log End Offset表示一个分区中消息日志的最后一个位置,即下一条消息要写入的位置。LEO是动态变化的,因为消息不断被追加到分区。它表示了分区中的最新消息位置。

      接下来笔者用一段具体的示例代码,来演示下如何使用Java和Kafka Consumer API来获取分区的HW和LEO:

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaHWLEOExample {
        public static void main(String[] args) {
            // 设置Kafka消费者的配置
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            // 创建Kafka消费者
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // 指定要订阅的主题
            String topic = "my-topic";
            consumer.subscribe(Collections.singletonList(topic));
    
            // 获取分区信息
            PartitionInfo partitionInfo = consumer.partitionsFor(topic).get(0);
            int partition = partitionInfo.partition();
    
            // 在消费者循环中获取HW和LEO
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (TopicPartition topicPartition : records.partitions()) {
                    long hw = consumer.position(topicPartition); // 获取HW
                    long leo = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); // 获取LEO
                    System.out.println("Partition " + topicPartition.partition() + ": HW = " + hw + ", LEO = " + leo);
                }
            }
        }
    }

      上面的代码创建了一个Kafka消费者,并订阅了一个主题。在消费者循环中,我们使用position()方法来获取分区的HW,并使用endOffsets()方法来获取分区的LEO。这可以帮助我们监视分区的消息复制进度和消息日志的结束位置。

    0 分享到:
    和我们在线交谈!
    【网站地图】【sitemap】