kafka原理和实践(四)spring-kafka消费者源码 - Go语言中文社区

kafka原理和实践(四)spring-kafka消费者源码


 

正文

系列目录

kafka原理和实践(一)原理:10分钟入门

kafka原理和实践(二)spring-kafka简单实践

kafka原理和实践(三)spring-kafka生产者源码

kafka原理和实践(四)spring-kafka消费者源码

kafka原理和实践(五)spring-kafka配置详解

kafka原理和实践(六)总结升华

 

 

==============正文分割线=====================

一、kafkaConsumer消费者模型

如上图所示,spring-kafka消费者模型主要流程:

1.容器启动,轮询执行消费。

2.kafkaConsumer拉取消息流程:

1)Fetcher请求获取器获取请求并存储在unset中

2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords

3. 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。

一、kafkaConsumer构造

复制代码
  1 @SuppressWarnings("unchecked")
  2     private KafkaConsumer(ConsumerConfig config,
  3                           Deserializer<K> keyDeserializer,
  4                           Deserializer<V> valueDeserializer) {
  5         try {
  6             log.debug("Starting the Kafka consumer");
  7             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
  8             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
  9             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 10             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
 11                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 12             this.time = new SystemTime();
 13 
 14             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
 15             if (clientId.length() <= 0)
 16                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
 17             this.clientId = clientId;
 18             Map<String, String> metricsTags = new LinkedHashMap<>();
 19             metricsTags.put("client-id", clientId);
 20             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
 21                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
 22                     .tags(metricsTags);
 23             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
 24                     MetricsReporter.class);
 25             reporters.add(new JmxReporter(JMX_PREFIX));
 26             this.metrics = new Metrics(metricConfig, reporters, time);
 27             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 28 
 29             // load interceptors and make sure they get clientId
 30             Map<String, Object> userProvidedConfigs = config.originals();
 31             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 32             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
 33                     ConsumerInterceptor.class);
 34             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
 35             if (keyDeserializer == null) {
 36                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 37                         Deserializer.class);
 38                 this.keyDeserializer.configure(config.originals(), true);
 39             } else {
 40                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 41                 this.keyDeserializer = keyDeserializer;
 42             }
 43             if (valueDeserializer == null) {
 44                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 45                         Deserializer.class);
 46                 this.valueDeserializer.configure(config.originals(), false);
 47             } else {
 48                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 49                 this.valueDeserializer = valueDeserializer;
 50             }
 51             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
 52             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
 53             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
 54             this.metadata.update(Cluster.bootstrap(addresses), 0);
 55             String metricGrpPrefix = "consumer";
 56             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
 57             NetworkClient netClient = new NetworkClient(
 58                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
 59                     this.metadata,
 60                     clientId,
 61                     100, // a fixed large enough value will suffice
 62                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
 63                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
 64                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
 65                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
 66             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
 67                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
 68             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
 69             this.subscriptions = new SubscriptionState(offsetResetStrategy);
 70             List<PartitionAssignor> assignors = config.getConfiguredInstances(
 71                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 72                     PartitionAssignor.class);
 73             this.coordinator = new ConsumerCoordinator(this.client,
 74                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
 75                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 76                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 77                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
 78                     assignors,
 79                     this.metadata,
 80                     this.subscriptions,
 81                     metrics,
 82                     metricGrpPrefix,
 83                     this.time,
 84                     retryBackoffMs,
 85                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
 86                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 87                     config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 88                     this.interceptors,
 89                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
 90             this.fetcher = new Fetcher<>(this.client,
 91                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
 92                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
 93                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
 94                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
 95                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 96                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
 97                     this.keyDeserializer,
 98                     this.valueDeserializer,
 99                     this.metadata,
100                     this.subscriptions,
101                     metrics,
102                     metricGrpPrefix,
103                     this.time,
104                     this.retryBackoffMs);
105 
106             config.logUnused();
107             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
108 
109             log.debug("Kafka consumer created");
110         } catch (Throwable t) {
111             // call close methods if internal objects are already constructed
112             // this is to prevent resource leak. see KAFKA-2121
113             close(true);
114             // now propagate the exception
115             throw new KafkaException("Failed to construct kafka consumer", t);
116         }
117     }
复制代码

 从KafkaConsumer构造函数来看,核心组件有:

1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。

2.ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。

3.SubscriptionState:订阅的TopicPartition的offset状态维护

4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance

5.Fetcher:从brokers上按照配置获取消息。

二、消费者容器启动流程

kafka消费者有两种常见的实现方式:

1.xml配置文件

2.基于注解实现

其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。

基于xml的总体配置如下:

复制代码
 1 <!-- 1.定义consumer的参数 -->
 2     <bean id="consumerProperties" class="java.util.HashMap">
 3         <constructor-arg>
 4             <map>
 5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
 6                 <entry key="group.id" value="${group.id}" />
 7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
 8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
 9                 <entry key="key.deserializer"
10                     value="org.apache.kafka.common.serialization.StringDeserializer" />
11                 <entry key="value.deserializer"
12                     value="org.apache.kafka.common.serialization.StringDeserializer" />
13             </map>
14         </constructor-arg>
15     </bean>
16 
17     <!-- 2.创建consumerFactory bean -->
18     <bean id="consumerFactory"
19         class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
20         <constructor-arg>
21             <ref bean="consumerProperties" />
22         </constructor-arg>
23     </bean>
24 
25     <!-- 3.定义消费实现类 -->
26     <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
27 
28     <!-- 4.消费者容器配置信息 -->
29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30         <!-- topic -->
31         <constructor-arg name="topics">
32             <list>
33                 <value>${kafka.consumer.topic.credit.for.lease}</value>
34                 <value>${loan.application.feedback.topic}</value>
35                 <value>${templar.agreement.feedback.topic}</value>
36                 <value>${templar.aggrement.active.feedback.topic}</value>
37                 <value>${templar.aggrement.agreementRepaid.topic}</value>
38                 <value>${templar.aggrement.agreementWithhold.topic}</value>
39                 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
40             </list>
41         </constructor-arg>
42         <property name="messageListener" ref="kafkaConsumerService" />
43     </bean>
44     <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
45     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
46         <constructor-arg ref="consumerFactory" />
47         <constructor-arg ref="containerProperties" />
48         <property name="concurrency" value="${concurrency}" />
49     </bean>
复制代码

 分为5个步骤:

 2.1.定义消费参数bean

consumerProperties ,就是个map<key,value>

2.2.创建consumerFactory bean

DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者判断是否自动提交2个方法。通过consumerProperties作为参数构造。
复制代码
1 public interface ConsumerFactory<K, V> {
2 
3     Consumer<K, V> createConsumer();
4 
5     boolean isAutoCommit();
6 
7 
8 }
复制代码

2.3.定义消费实现类

自定义一个类实现MessageListener接口,接口设计如下:

实现onMessage方法,去消费接收到的消息。两种方案:

1)MessageListener 消费完消息后自动提交offset(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。

2)AcknowledgingMessageListener 消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。

2.4.监听容器配置信息

ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
复制代码
  1 public class ContainerProperties {
  2 
  3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
  4 
  5     private static final int DEFAULT_QUEUE_DEPTH = 1;
  6 
  7     private static final int DEFAULT_PAUSE_AFTER = 10000;
  8 
  9     /**
 10      * Topic names.监听的主题字符串数组
 11      */
 12     private final String[] topics;
 13 
 14     /**
 15      * Topic pattern.监听的主题模板
 16      */
 17     private final Pattern topicPattern;
 18 
 19     /**
 20      * Topics/partitions/initial offsets.
 21      */
 22     private final TopicPartitionInitialOffset[] topicPartitions;
 23 
 24     /**
 25      * 确认模式(自动确认属性为false时使用)
 26      * <ul>
 27      * <li>1.RECORD逐条确认: 每条消息被发送给监听者后确认</li>
 28      * <li>2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认</li>
 30      * <li>3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than
 31      * {@code #setPollTimeout(long) pollTimeout}.</li>
 32      * <li>4.COUNT计数确认: 当接收到指定数量之后确认</li>
 33      * <li>5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener</ul>
 36      */
 37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
 38 
 39     /**
 40      * The number of outstanding record count after which offsets should be
 41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
 42      * used.
 43      */
 44     private int ackCount;
 45 
 46     /**
 47      * The time (ms) after which outstanding offsets should be committed when
 48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
 49      * larger than
 50      */
 51     private long ackTime;
 52 
 53     /**
 54      * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55      * 56      */
 57     private Object messageListener;
 58 
 59     /**
 60      * The max time to block in the consumer waiting for records.
 61      */
 62     private volatile long pollTimeout = 1000;
 63 
 64     /**
 65      * 线程执行器:轮询消费者
 66      */
 67     private AsyncListenableTaskExecutor consumerTaskExecutor;
 68 
 69     /**
 70      * 线程执行器:调用监听器
 71      */
 72     private AsyncListenableTaskExecutor listenerTaskExecutor;
 73 
 74     /**
 75      * 错误回调,当监听器抛出异常时
 76      */
 77     private GenericErrorHandler<?> errorHandler;
 78 
 79     /**
 80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
 81      * true, the delay after which the consumer should be paused. Default 10000.
 82      */
 83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
 84 
 85     /**
 86      * When true, avoids rebalancing when this consumer is slow or throws a
 87      * qualifying exception - pauses the consumer. Default: true.
 88      * @see #pauseAfter
 89      */
 90     private boolean pauseEnabled = true;
 91 
 92     /**
 93      * Set the queue depth for handoffs from the consumer thread to the listener
 94      * thread. Default 1 (up to 2 in process).
 95      */
 96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
 97 
 98     /**
 99      * 停止容器超时时间    */
103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
104 
105     /**
106      * 用户定义的消费者再平衡监听器实现类     */
108     private ConsumerRebalanceListener consumerRebalanceListener;
109 
110     /**
111      * 提交回调,默认记录日志。      */
114     private OffsetCommitCallback commitCallback;
115 
116     /**
117      * Whether or not to call consumer.commitSync() or commitAsync() when the
118      * container is responsible for commits. Default true. See
119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
120      * writing, async commits are not entirely reliable.
121      */
122     private boolean syncCommits = true;
123 
124     private boolean ackOnError = true;
125 
126     private Long idleEventInterval;
127 
128     public ContainerProperties(String... topics) {
129         Assert.notEmpty(topics, "An array of topicPartitions must be provided");
130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
131         this.topicPattern = null;
132         this.topicPartitions = null;
133     }
134 
135     public ContainerProperties(Pattern topicPattern) {
136         this.topics = null;
137         this.topicPattern = topicPattern;
138         this.topicPartitions = null;
139     }
140 
141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
142         this.topics = null;
143         this.topicPattern = null;
144         Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
145         this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
147     }
148 ...省略各种set、get
149 
150 }
复制代码

 

2.5.启动并发消息监听容器

核心类ConcurrentMessageListenerContainer,继承自抽象类AbstractMessageListenerContainer,类图如下:

 

看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:

1.构造函数,入参:消费者工厂ConsumerFactory+容器配置ContainerProperties

2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源码如下:

复制代码
  1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
  2 
  3     private final ConsumerFactory<K, V> consumerFactory;
  4 
  5     private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
  6 
  7     private int concurrency = 1;
  8 
  9     /**
 10      * Construct an instance with the supplied configuration properties.
 11      * The topic partitions are distributed evenly across the delegate
 12      * {@link KafkaMessageListenerContainer}s.
 13      * @param consumerFactory the consumer factory.
 14      * @param containerProperties the container properties.
 15      */
 16     public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
 17             ContainerProperties containerProperties) {
 18         super(containerProperties);
 19         Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
 20         this.consumerFactory = consumerFactory;
 21     }
 22 
 23     public int getConcurrency() {
 24         return this.concurrency;
 25     }
 26 
 27     /**
 28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 29      * Messages from within the same partition will be processed sequentially.
 30      * @param concurrency the concurrency.
 31      */
 32     public void setConcurrency(int concurrency) {
 33         Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
 34         this.concurrency = concurrency;
 35     }
 36 
 37     /**
 38      * Return the list of {@link KafkaMessageListenerContainer}s created by
 39      * this container.
 40      * @return the list of {@link KafkaMessageListenerContainer}s created by
 41      * this container.
 42      */
 43     public List<KafkaMessageListenerContainer<K, V>> getContainers() {
 44         return Collections.unmodifiableList(this.containers);
 45     }
 46 
 47     /*
 48      * Under lifecycle lock.
 49      */
 50     @Override
 51     protected void doStart() {
 52         if (!isRunning()) {
 53             ContainerProperties containerProperties = getContainerProperties();
 54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
 55             if (topicPartitions != null//校验并发数>分区数,报错。
 56                     && this.concurrency > topicPartitions.length) {
 57                 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
 58                         + "equal to the number of partitions; reduced from " + this.concurrency + " to "
 59                         + topicPartitions.length);
 60                 this.concurrency = topicPartitions.length;//并发数最大只能=分区数
 61             }
 62             setRunning(true);
 63             //遍历创建监听器容器
 64             for (int i = 0; i < this.concurrency; i++) {
 65                 KafkaMessageListenerContainer<K, V> container;
 66                 if (topicPartitions == null) {
 67                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
 68                 }
 69                 else {
 70                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
 71                             partitionSubset(containerProperties, i));
 72                 }
 73                 if (getBeanName() != null) {
 74                     container.setBeanName(getBeanName() + "-" + i);
 75                 }
 76                 if (getApplicationEventPublisher() != null) {
 77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
 78                 }
 79                 container.setClientIdSuffix("-" + i);
 80                 container.start();//核心方法,启动容器
 81                 this.containers.add(container);
 82             }
 83         }
 84     }146 ...省略
147 }
复制代码

 继续追踪,调用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢