c语言编程笔录

首页 >   > 笔记大全

笔记大全

kafka核心消费逻辑源码分析

更新时间:2023-10-19

前言

Kafka是一个分布式流平台,提供了高吞吐量的发布-订阅系统。它的核心消费逻辑是Kafka消费者的实现。Kafka消费者是一个多线程应用,每个消费者启动一个或多个线程来消费一个或多个分区中的消息。本文将从源码层面分析Kafka核心消费逻辑。

消费者的核心线程

Kafka消费者的核心线程是KafkaConsumer类的poll方法,该方法负责从分区中拉取消息并返回给用户。具体实现如下:
public ConsumerRecords<K, V> poll(long timeout) {
    acquireConsumerCoordinator();
    if (subscriptions.hasNoSubscriptionOrUserAssignment())
        throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

    long remainingPollTime = timeout;
    long pollStartTime = time.milliseconds();
    do {
        if (coordinator != null) { // 已经加入了消费者组
            // 获取消费者组的高优先级任务,比如可以重新分配分区
            // 这些任务会在heartbeat线程和rebalance线程中添加
            // 协调器会负责分配这些任务
            coordinator.updateTasks();
            // 处理Kafka协议的请求和响应
            client.poll(remainingPollTime);
        }
        // 检查是否存在需要处理的后台任务
        // 例如心跳、元数据更新和分区分配
        runPendingTasks();
        // 从分区中拉取消息
        pollOnce(remainingPollTime); // 核心逻辑
        // 更新心跳和元数据信息
        refreshMetadataIfNecessary();
        // 处理一些外部中断或异常的情况
        maybeWakeupForMetadataCheck();
        remainingPollTime = timeout - (time.milliseconds() - pollStartTime);
    } while (remainingPollTime > 0 && shouldContinuePole());
    // 返回获取到的消息
    return records;
}

消息的拉取

pollOnce方法是负责从分区中拉取消息的核心逻辑,具体实现如下:
private void pollOnce(long timeout) {
    // 检查订阅的topic和分区是否发生变化
    // 如果发生变化,会触发重新分配分区
    client.ensureFreshMetadata();

    // 获取消费者订阅的分区
    Collection<TopicPartition> fetchablePartitions = fetcher.sendFetches();
    if (!fetchablePartitions.isEmpty()) {
        // 发送Fetch请求给Broker获取消息
        // 这里使用了Fetch请求的批量处理机制
        fetcher.sendFetchRequest(fetchablePartitions);
    }

    // 如果有消息从Broker获取到
    if (fetcher.hasCompletedFetches()) {
        // 依次处理每个分区的消息
        for (completedFetch = fetcher.firstCompletedFetch(); completedFetch != null; completedFetch = fetcher.nextCompletedFetch()) {
            // 过滤掉已经提交的消息
            if (subscriptions.isAssigned(completedFetch.partition)) {
                // 消息处理逻辑的核心方法
                processCompletedFetch(completedFetch);
                // 更新每个分区已经消费的消息的offset
                subscriptions.updateCommittedOffset(completedFetch.partition, completedFetch.fetchOffset);
            }
            // 更新统计信息和日志
            updateFetchMetrics(completedFetch);
        }
    }

    // 处理下次拉取消息的时间间隔
    // 这里根据配置的参数值决定是否进行限制
    // 以避免过于频繁的拉取消息
    if (!subscriptions.hasAllFetchPositions()) {
        // 检查是否需要等待一段时间再重试拉取
        // 例如如果有一些分区处于负载均衡阶段,则需要等待,以避免重复拉取和消费
        long metadataTimeout = Math.min(
            coordinator.timeToNextTask(Poll.INTERVAL_MS),
            fetcher.metadataTimeoutMs()
        );
        pollTimeout = metadataTimeout;
    } else {
        // 如果已经获取到了所有分区的消息,则根据参数设置的最大拉取间隔计算下次拉取时间
        pollTimeout = Math.min(
            coordinator.timeToNextTask(Poll.INTERVAL_MS),
            fetcher.backoffTimeMs()
        );
    }

    // 如果超过了传入的timeout,则强制退出
    if (pollTimeout > timeout && timeout >= 0)
        pollTimeout = timeout;

    // 根据下次拉取的时间间隔进行等待
    // 这里使用了Object.wait方法,等待心跳线程或IO线程的唤醒
    client.poll(pollTimeout);
}

总结

Kafka的核心消费逻辑源码分析可以从KafkaConsumer类的poll方法入手。该方法通过调用多个其他方法来实现消息的消费。其中pollOnce方法负责从分区中拉取消息,首先检查订阅的topic和分区是否发生变化,然后发送Fetch请求到Broker获取消息,获取到消息后,依次处理每个分区的消息,并更新每个分区已消费的offset,最后根据下次拉取的时间间隔进行等待。通过这个分析,我们可以了解Kafka消费者是如何实现消息拉取和处理的。