前言
Kafka是一个分布式流平台,提供了高吞吐量的发布-订阅系统。它的核心消费逻辑是Kafka消费者的实现。Kafka消费者是一个多线程应用,每个消费者启动一个或多个线程来消费一个或多个分区中的消息。本文将从源码层面分析Kafka核心消费逻辑。
消费者的核心线程
Kafka消费者的核心线程是KafkaConsumer类的poll方法,该方法负责从分区中拉取消息并返回给用户。具体实现如下:
public ConsumerRecords<K, V> poll(long timeout) {
acquireConsumerCoordinator();
if (subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
long remainingPollTime = timeout;
long pollStartTime = time.milliseconds();
do {
if (coordinator != null) { // 已经加入了消费者组
// 获取消费者组的高优先级任务,比如可以重新分配分区
// 这些任务会在heartbeat线程和rebalance线程中添加
// 协调器会负责分配这些任务
coordinator.updateTasks();
// 处理Kafka协议的请求和响应
client.poll(remainingPollTime);
}
// 检查是否存在需要处理的后台任务
// 例如心跳、元数据更新和分区分配
runPendingTasks();
// 从分区中拉取消息
pollOnce(remainingPollTime); // 核心逻辑
// 更新心跳和元数据信息
refreshMetadataIfNecessary();
// 处理一些外部中断或异常的情况
maybeWakeupForMetadataCheck();
remainingPollTime = timeout - (time.milliseconds() - pollStartTime);
} while (remainingPollTime > 0 && shouldContinuePole());
// 返回获取到的消息
return records;
}
消息的拉取
pollOnce方法是负责从分区中拉取消息的核心逻辑,具体实现如下:
private void pollOnce(long timeout) {
// 检查订阅的topic和分区是否发生变化
// 如果发生变化,会触发重新分配分区
client.ensureFreshMetadata();
// 获取消费者订阅的分区
Collection<TopicPartition> fetchablePartitions = fetcher.sendFetches();
if (!fetchablePartitions.isEmpty()) {
// 发送Fetch请求给Broker获取消息
// 这里使用了Fetch请求的批量处理机制
fetcher.sendFetchRequest(fetchablePartitions);
}
// 如果有消息从Broker获取到
if (fetcher.hasCompletedFetches()) {
// 依次处理每个分区的消息
for (completedFetch = fetcher.firstCompletedFetch(); completedFetch != null; completedFetch = fetcher.nextCompletedFetch()) {
// 过滤掉已经提交的消息
if (subscriptions.isAssigned(completedFetch.partition)) {
// 消息处理逻辑的核心方法
processCompletedFetch(completedFetch);
// 更新每个分区已经消费的消息的offset
subscriptions.updateCommittedOffset(completedFetch.partition, completedFetch.fetchOffset);
}
// 更新统计信息和日志
updateFetchMetrics(completedFetch);
}
}
// 处理下次拉取消息的时间间隔
// 这里根据配置的参数值决定是否进行限制
// 以避免过于频繁的拉取消息
if (!subscriptions.hasAllFetchPositions()) {
// 检查是否需要等待一段时间再重试拉取
// 例如如果有一些分区处于负载均衡阶段,则需要等待,以避免重复拉取和消费
long metadataTimeout = Math.min(
coordinator.timeToNextTask(Poll.INTERVAL_MS),
fetcher.metadataTimeoutMs()
);
pollTimeout = metadataTimeout;
} else {
// 如果已经获取到了所有分区的消息,则根据参数设置的最大拉取间隔计算下次拉取时间
pollTimeout = Math.min(
coordinator.timeToNextTask(Poll.INTERVAL_MS),
fetcher.backoffTimeMs()
);
}
// 如果超过了传入的timeout,则强制退出
if (pollTimeout > timeout && timeout >= 0)
pollTimeout = timeout;
// 根据下次拉取的时间间隔进行等待
// 这里使用了Object.wait方法,等待心跳线程或IO线程的唤醒
client.poll(pollTimeout);
}
总结
Kafka的核心消费逻辑源码分析可以从KafkaConsumer类的poll方法入手。该方法通过调用多个其他方法来实现消息的消费。其中pollOnce方法负责从分区中拉取消息,首先检查订阅的topic和分区是否发生变化,然后发送Fetch请求到Broker获取消息,获取到消息后,依次处理每个分区的消息,并更新每个分区已消费的offset,最后根据下次拉取的时间间隔进行等待。通过这个分析,我们可以了解Kafka消费者是如何实现消息拉取和处理的。