kafka核心消费逻辑源码分析
张姿妤 2023-07-31编程经验
前言Kafka是一个分布式流平台,提供了高吞吐量的发布-订阅系统。它的核心消费逻辑是Kafka消费者的实现。Kafka消费者是一个多线程应用,每个消费者启动一个或多
前言
Kafka是一个分布式流平台,提供了高吞吐量的发布-订阅系统。它的核心消费逻辑是Kafka消费者的实现。Kafka消费者是一个多线程应用,每个消费者启动一个或多个线程来消费一个或多个分区中的消息。本文将从源码层面分析Kafka核心消费逻辑。消费者的核心线程
Kafka消费者的核心线程是KafkaConsumer类的poll方法,该方法负责从分区中拉取消息并返回给用户。具体实现如下:public ConsumerRecords<K, V> poll(long timeout) { acquireConsumerCoordinator(); if (subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); long remainingPollTime = timeout; long pollStartTime = time.milliseconds(); do { if (coordinator != null) { // 已经加入了消费者组 // 获取消费者组的高优先级任务,比如可以重新分配分区 // 这些任务会在heartbeat线程和rebalance线程中添加 // 协调器会负责分配这些任务 coordinator.updateTasks(); // 处理Kafka协议的请求和响应 client.poll(remainingPollTime); } // 检查是否存在需要处理的后台任务 // 例如心跳、元数据更新和分区分配 runPendingTasks(); // 从分区中拉取消息 pollOnce(remainingPollTime); // 核心逻辑 // 更新心跳和元数据信息 refreshMetadataIfNecessary(); // 处理一些外部中断或异常的情况 maybeWakeupForMetadataCheck(); remainingPollTime = timeout - (time.milliseconds() - pollStartTime); } while (remainingPollTime > 0 && shouldContinuePole()); // 返回获取到的消息 return records; }
消息的拉取
pollOnce方法是负责从分区中拉取消息的核心逻辑,具体实现如下:private void pollOnce(long timeout) { // 检查订阅的topic和分区是否发生变化 // 如果发生变化,会触发重新分配分区 client.ensureFreshMetadata(); // 获取消费者订阅的分区 Collection<TopicPartition> fetchablePartitions = fetcher.sendFetches(); if (!fetchablePartitions.isEmpty()) { // 发送Fetch请求给Broker获取消息 // 这里使用了Fetch请求的批量处理机制 fetcher.sendFetchRequest(fetchablePartitions); } // 如果有消息从Broker获取到 if (fetcher.hasCompletedFetches()) { // 依次处理每个分区的消息 for (completedFetch = fetcher.firstCompletedFetch(); completedFetch != null; completedFetch = fetcher.nextCompletedFetch()) { // 过滤掉已经提交的消息 if (subscriptions.isAssigned(completedFetch.partition)) { // 消息处理逻辑的核心方法 processCompletedFetch(completedFetch); // 更新每个分区已经消费的消息的offset subscriptions.updateCommittedOffset(completedFetch.partition, completedFetch.fetchOffset); } // 更新统计信息和日志 updateFetchMetrics(completedFetch); } } // 处理下次拉取消息的时间间隔 // 这里根据配置的参数值决定是否进行限制 // 以避免过于频繁的拉取消息 if (!subscriptions.hasAllFetchPositions()) { // 检查是否需要等待一段时间再重试拉取 // 例如如果有一些分区处于负载均衡阶段,则需要等待,以避免重复拉取和消费 long metadataTimeout = Math.min( coordinator.timeToNextTask(Poll.INTERVAL_MS), fetcher.metadataTimeoutMs() ); pollTimeout = metadataTimeout; } else { // 如果已经获取到了所有分区的消息,则根据参数设置的最大拉取间隔计算下次拉取时间 pollTimeout = Math.min( coordinator.timeToNextTask(Poll.INTERVAL_MS), fetcher.backoffTimeMs() ); } // 如果超过了传入的timeout,则强制退出 if (pollTimeout > timeout && timeout >= 0) pollTimeout = timeout; // 根据下次拉取的时间间隔进行等待 // 这里使用了Object.wait方法,等待心跳线程或IO线程的唤醒 client.poll(pollTimeout); }
总结
Kafka的核心消费逻辑源码分析可以从KafkaConsumer类的poll方法入手。该方法通过调用多个其他方法来实现消息的消费。其中pollOnce方法负责从分区中拉取消息,首先检查订阅的topic和分区是否发生变化,然后发送Fetch请求到Broker获取消息,获取到消息后,依次处理每个分区的消息,并更新每个分区已消费的offset,最后根据下次拉取的时间间隔进行等待。通过这个分析,我们可以了解Kafka消费者是如何实现消息拉取和处理的。 很赞哦! ()