正文
系列目录
kafka原理和实践(三)spring-kafka生产者源码
kafka原理和实践(四)spring-kafka消费者源码
==============正文分割线=====================
一、kafkaConsumer消费者模型
如上图所示,spring-kafka消费者模型主要流程:
1.容器启动,轮询执行消费。
2.kafkaConsumer拉取消息流程:
1)Fetcher请求获取器获取请求并存储在unset中
2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords
3. 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。
一、kafkaConsumer构造
1 @SuppressWarnings("unchecked") 2 private KafkaConsumer(ConsumerConfig config, 3 Deserializer<K> keyDeserializer, 4 Deserializer<V> valueDeserializer) { 5 try { 6 log.debug("Starting the Kafka consumer"); 7 this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); 8 int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); 9 int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); 10 if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) 11 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); 12 this.time = new SystemTime(); 13 14 String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); 15 if (clientId.length() <= 0) 16 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); 17 this.clientId = clientId; 18 Map<String, String> metricsTags = new LinkedHashMap<>(); 19 metricsTags.put("client-id", clientId); 20 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) 21 .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) 22 .tags(metricsTags); 23 List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 24 MetricsReporter.class); 25 reporters.add(new JmxReporter(JMX_PREFIX)); 26 this.metrics = new Metrics(metricConfig, reporters, time); 27 this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); 28 29 // load interceptors and make sure they get clientId 30 Map<String, Object> userProvidedConfigs = config.originals(); 31 userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 32 List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 33 ConsumerInterceptor.class); 34 this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList); 35 if (keyDeserializer == null) { 36 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 37 Deserializer.class); 38 this.keyDeserializer.configure(config.originals(), true); 39 } else { 40 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); 41 this.keyDeserializer = keyDeserializer; 42 } 43 if (valueDeserializer == null) { 44 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 45 Deserializer.class); 46 this.valueDeserializer.configure(config.originals(), false); 47 } else { 48 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); 49 this.valueDeserializer = valueDeserializer; 50 } 51 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); 52 this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); 53 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); 54 this.metadata.update(Cluster.bootstrap(addresses), 0); 55 String metricGrpPrefix = "consumer"; 56 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 57 NetworkClient netClient = new NetworkClient( 58 new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), 59 this.metadata, 60 clientId, 61 100, // a fixed large enough value will suffice 62 config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), 63 config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), 64 config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 65 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); 66 this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, 67 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); 68 OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); 69 this.subscriptions = new SubscriptionState(offsetResetStrategy); 70 List<PartitionAssignor> assignors = config.getConfiguredInstances( 71 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 72 PartitionAssignor.class); 73 this.coordinator = new ConsumerCoordinator(this.client, 74 config.getString(ConsumerConfig.GROUP_ID_CONFIG), 75 config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 76 config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 77 config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 78 assignors, 79 this.metadata, 80 this.subscriptions, 81 metrics, 82 metricGrpPrefix, 83 this.time, 84 retryBackoffMs, 85 new ConsumerCoordinator.DefaultOffsetCommitCallback(), 86 config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), 87 config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 88 this.interceptors, 89 config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); 90 this.fetcher = new Fetcher<>(this.client, 91 config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), 92 config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 93 config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), 94 config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 95 config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 96 config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), 97 this.keyDeserializer, 98 this.valueDeserializer, 99 this.metadata, 100 this.subscriptions, 101 metrics, 102 metricGrpPrefix, 103 this.time, 104 this.retryBackoffMs); 105 106 config.logUnused(); 107 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); 108 109 log.debug("Kafka consumer created"); 110 } catch (Throwable t) { 111 // call close methods if internal objects are already constructed 112 // this is to prevent resource leak. see KAFKA-2121 113 close(true); 114 // now propagate the exception 115 throw new KafkaException("Failed to construct kafka consumer", t); 116 } 117 }
从KafkaConsumer构造函数来看,核心组件有:
1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。
2.ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。
3.SubscriptionState:订阅的TopicPartition的offset状态维护
4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance
5.Fetcher:从brokers上按照配置获取消息。
二、消费者容器启动流程
kafka消费者有两种常见的实现方式:
1.xml配置文件
2.基于注解实现
其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。
基于xml的总体配置如下:
1 <!-- 1.定义consumer的参数 -->
2 <bean id="consumerProperties" class="java.util.HashMap">
3 <constructor-arg>
4 <map>
5 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
6 <entry key="group.id" value="${group.id}" />
7 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
8 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
9 <entry key="key.deserializer"
10 value="org.apache.kafka.common.serialization.StringDeserializer" />
11 <entry key="value.deserializer"
12 value="org.apache.kafka.common.serialization.StringDeserializer" />
13 </map>
14 </constructor-arg>
15 </bean>
16
17 <!-- 2.创建consumerFactory bean -->
18 <bean id="consumerFactory"
19 class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
20 <constructor-arg>
21 <ref bean="consumerProperties" />
22 </constructor-arg>
23 </bean>
24
25 <!-- 3.定义消费实现类 -->
26 <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
27
28 <!-- 4.消费者容器配置信息 -->
29 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30 <!-- topic -->
31 <constructor-arg name="topics">
32 <list>
33 <value>${kafka.consumer.topic.credit.for.lease}</value>
34 <value>${loan.application.feedback.topic}</value>
35 <value>${templar.agreement.feedback.topic}</value>
36 <value>${templar.aggrement.active.feedback.topic}</value>
37 <value>${templar.aggrement.agreementRepaid.topic}</value>
38 <value>${templar.aggrement.agreementWithhold.topic}</value>
39 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
40 </list>
41 </constructor-arg>
42 <property name="messageListener" ref="kafkaConsumerService" />
43 </bean>
44 <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
45 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
46 <constructor-arg ref="consumerFactory" />
47 <constructor-arg ref="containerProperties" />
48 <property name="concurrency" value="${concurrency}" />
49 </bean>
分为5个步骤:
2.1.定义消费参数bean
consumerProperties ,就是个map<key,value>
2.2.创建consumerFactory bean
DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者和判断是否自动提交2个方法。通过consumerProperties作为参数构造。
1 public interface ConsumerFactory<K, V> { 2 3 Consumer<K, V> createConsumer(); 4 5 boolean isAutoCommit(); 6 7 8 }
2.3.定义消费实现类
自定义一个类实现MessageListener接口,接口设计如下:
实现onMessage方法,去消费接收到的消息。两种方案:
1)MessageListener 消费完消息后自动提交offset(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。
2)AcknowledgingMessageListener 消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。
2.4.监听容器配置信息
ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
1 public class ContainerProperties { 2 3 private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000; 4 5 private static final int DEFAULT_QUEUE_DEPTH = 1; 6 7 private static final int DEFAULT_PAUSE_AFTER = 10000; 8 9 /** 10 * Topic names.监听的主题字符串数组 11 */ 12 private final String[] topics; 13 14 /** 15 * Topic pattern.监听的主题模板 16 */ 17 private final Pattern topicPattern; 18 19 /** 20 * Topics/partitions/initial offsets. 21 */ 22 private final TopicPartitionInitialOffset[] topicPartitions; 23 24 /** 25 * 确认模式(自动确认属性为false时使用) 26 * <ul> 27 * <li>1.RECORD逐条确认: 每条消息被发送给监听者后确认</li> 28 * <li>2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认</li> 30 * <li>3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than 31 * {@code #setPollTimeout(long) pollTimeout}.</li> 32 * <li>4.COUNT计数确认: 当接收到指定数量之后确认</li> 33 * <li>5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener)</ul> 36 */ 37 private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH; 38 39 /** 40 * The number of outstanding record count after which offsets should be 41 * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being 42 * used. 43 */ 44 private int ackCount; 45 46 /** 47 * The time (ms) after which outstanding offsets should be committed when 48 * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be 49 * larger than 50 */ 51 private long ackTime; 52 53 /** 54 * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55 * 56 */ 57 private Object messageListener; 58 59 /** 60 * The max time to block in the consumer waiting for records. 61 */ 62 private volatile long pollTimeout = 1000; 63 64 /** 65 * 线程执行器:轮询消费者 66 */ 67 private AsyncListenableTaskExecutor consumerTaskExecutor; 68 69 /** 70 * 线程执行器:调用监听器 71 */ 72 private AsyncListenableTaskExecutor listenerTaskExecutor; 73 74 /** 75 * 错误回调,当监听器抛出异常时 76 */ 77 private GenericErrorHandler<?> errorHandler; 78 79 /** 80 * When using Kafka group management and {@link #setPauseEnabled(boolean)} is 81 * true, the delay after which the consumer should be paused. Default 10000. 82 */ 83 private long pauseAfter = DEFAULT_PAUSE_AFTER; 84 85 /** 86 * When true, avoids rebalancing when this consumer is slow or throws a 87 * qualifying exception - pauses the consumer. Default: true. 88 * @see #pauseAfter 89 */ 90 private boolean pauseEnabled = true; 91 92 /** 93 * Set the queue depth for handoffs from the consumer thread to the listener 94 * thread. Default 1 (up to 2 in process). 95 */ 96 private int queueDepth = DEFAULT_QUEUE_DEPTH; 97 98 /** 99 * 停止容器超时时间 */ 103 private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; 104 105 /** 106 * 用户定义的消费者再平衡监听器实现类 */ 108 private ConsumerRebalanceListener consumerRebalanceListener; 109 110 /** 111 * 提交回调,默认记录日志。 */ 114 private OffsetCommitCallback commitCallback; 115 116 /** 117 * Whether or not to call consumer.commitSync() or commitAsync() when the 118 * container is responsible for commits. Default true. See 119 * https://github.com/spring-projects/spring-kafka/issues/62 At the time of 120 * writing, async commits are not entirely reliable. 121 */ 122 private boolean syncCommits = true; 123 124 private boolean ackOnError = true; 125 126 private Long idleEventInterval; 127 128 public ContainerProperties(String... topics) { 129 Assert.notEmpty(topics, "An array of topicPartitions must be provided"); 130 this.topics = Arrays.asList(topics).toArray(new String[topics.length]); 131 this.topicPattern = null; 132 this.topicPartitions = null; 133 } 134 135 public ContainerProperties(Pattern topicPattern) { 136 this.topics = null; 137 this.topicPattern = topicPattern; 138 this.topicPartitions = null; 139 } 140 141 public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) { 142 this.topics = null; 143 this.topicPattern = null; 144 Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided"); 145 this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions)) 146 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]); 147 } 148 ...省略各种set、get 149 150 }
2.5.启动并发消息监听容器
核心类ConcurrentMessageListenerContainer,继承自抽象类AbstractMessageListenerContainer,类图如下:
看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:
1.构造函数,入参:消费者工厂ConsumerFactory+容器配置ContainerProperties
2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源码如下:
1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { 2 3 private final ConsumerFactory<K, V> consumerFactory; 4 5 private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>(); 6 7 private int concurrency = 1; 8 9 /** 10 * Construct an instance with the supplied configuration properties. 11 * The topic partitions are distributed evenly across the delegate 12 * {@link KafkaMessageListenerContainer}s. 13 * @param consumerFactory the consumer factory. 14 * @param containerProperties the container properties. 15 */ 16 public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, 17 ContainerProperties containerProperties) { 18 super(containerProperties); 19 Assert.notNull(consumerFactory, "A ConsumerFactory must be provided"); 20 this.consumerFactory = consumerFactory; 21 } 22 23 public int getConcurrency() { 24 return this.concurrency; 25 } 26 27 /** 28 * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running. 29 * Messages from within the same partition will be processed sequentially. 30 * @param concurrency the concurrency. 31 */ 32 public void setConcurrency(int concurrency) { 33 Assert.isTrue(concurrency > 0, "concurrency must be greater than 0"); 34 this.concurrency = concurrency; 35 } 36 37 /** 38 * Return the list of {@link KafkaMessageListenerContainer}s created by 39 * this container. 40 * @return the list of {@link KafkaMessageListenerContainer}s created by 41 * this container. 42 */ 43 public List<KafkaMessageListenerContainer<K, V>> getContainers() { 44 return Collections.unmodifiableList(this.containers); 45 } 46 47 /* 48 * Under lifecycle lock. 49 */ 50 @Override 51 protected void doStart() { 52 if (!isRunning()) { 53 ContainerProperties containerProperties = getContainerProperties(); 54 TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); 55 if (topicPartitions != null//校验并发数>分区数,报错。 56 && this.concurrency > topicPartitions.length) { 57 this.logger.warn("When specific partitions are provided, the concurrency must be less than or " 58 + "equal to the number of partitions; reduced from " + this.concurrency + " to " 59 + topicPartitions.length); 60 this.concurrency = topicPartitions.length;//并发数最大只能=分区数 61 } 62 setRunning(true); 63 //遍历创建监听器容器 64 for (int i = 0; i < this.concurrency; i++) { 65 KafkaMessageListenerContainer<K, V> container; 66 if (topicPartitions == null) { 67 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); 68 } 69 else { 70 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, 71 partitionSubset(containerProperties, i)); 72 } 73 if (getBeanName() != null) { 74 container.setBeanName(getBeanName() + "-" + i); 75 } 76 if (getApplicationEventPublisher() != null) { 77 container.setApplicationEventPublisher(getApplicationEventPublisher()); 78 } 79 container.setClientIdSuffix("-" + i); 80 container.start();//核心方法,启动容器 81 this.containers.add(container); 82 } 83 } 84 }146 ...省略 147 }
继续追踪,调用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。